From c414ba457953277b3f69c1558bb51e659e921006 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 16 Jan 2026 12:48:56 -0700 Subject: [PATCH 1/9] fix --- benchmarks/pyspark/run_all_benchmarks.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/benchmarks/pyspark/run_all_benchmarks.sh b/benchmarks/pyspark/run_all_benchmarks.sh index 707d971f24..55491581f0 100755 --- a/benchmarks/pyspark/run_all_benchmarks.sh +++ b/benchmarks/pyspark/run_all_benchmarks.sh @@ -25,7 +25,7 @@ set -e SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" DATA_PATH="${1:-/tmp/shuffle-benchmark-data}" -COMET_JAR="${COMET_JAR:-$SCRIPT_DIR/../spark/target/comet-spark-spark3.5_2.12-0.13.0-SNAPSHOT.jar}" +COMET_JAR="${COMET_JAR:-$SCRIPT_DIR/../../spark/target/comet-spark-spark3.5_2.12-0.13.0-SNAPSHOT.jar}" SPARK_MASTER="${SPARK_MASTER:-local[*]}" EXECUTOR_MEMORY="${EXECUTOR_MEMORY:-16g}" EVENT_LOG_DIR="${EVENT_LOG_DIR:-/tmp/spark-events}" From 4404d25a028b427ba88fcca9d46bc5ea9d851b9b Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sat, 17 Jan 2026 19:39:38 -0700 Subject: [PATCH 2/9] perf: [WIP] optimize CometColumnarToRow for complex types This is a work-in-progress optimization for complex types (arrays and maps) in CometColumnarToRowExec. The changes include: Phase 1: Cache offset buffer addresses in Comet vectors - CometListVector and CometMapVector now cache the offset buffer memory address at construction time - getArray()/getMap() use Platform.getInt() for direct memory access instead of getOffsetBuffer().getInt() - This eliminates virtual method calls per-row Phase 2: Custom code generation for complex types - CometColumnarToRowExec now generates optimized code for ArrayType and MapType columns - Per-batch caching of offset buffer addresses and child vectors - Uses Platform.getInt() directly in generated code for offset access TODO: - Phase 3: Reusable wrapper objects (MutableColumnarArray/Map) - Benchmarking to measure performance improvements - Additional test coverage Co-Authored-By: Claude Opus 4.5 --- .../apache/comet/vector/CometListVector.java | 17 +- .../apache/comet/vector/CometMapVector.java | 22 +- .../sql/comet/CometColumnarToRowExec.scala | 203 +++++++++++++++++- 3 files changed, 230 insertions(+), 12 deletions(-) diff --git a/common/src/main/java/org/apache/comet/vector/CometListVector.java b/common/src/main/java/org/apache/comet/vector/CometListVector.java index 93e8e8bf9f..a16b40d0b9 100644 --- a/common/src/main/java/org/apache/comet/vector/CometListVector.java +++ b/common/src/main/java/org/apache/comet/vector/CometListVector.java @@ -25,6 +25,7 @@ import org.apache.arrow.vector.util.TransferPair; import org.apache.spark.sql.vectorized.ColumnVector; import org.apache.spark.sql.vectorized.ColumnarArray; +import org.apache.spark.unsafe.Platform; /** A Comet column vector for list type. */ public class CometListVector extends CometDecodedVector { @@ -32,6 +33,7 @@ public class CometListVector extends CometDecodedVector { final ValueVector dataVector; final ColumnVector dataColumnVector; final DictionaryProvider dictionaryProvider; + final long offsetBufferAddress; public CometListVector( ValueVector vector, boolean useDecimal128, DictionaryProvider dictionaryProvider) { @@ -41,13 +43,24 @@ public CometListVector( this.dataVector = listVector.getDataVector(); this.dictionaryProvider = dictionaryProvider; this.dataColumnVector = getVector(dataVector, useDecimal128, dictionaryProvider); + this.offsetBufferAddress = listVector.getOffsetBuffer().memoryAddress(); + } + + /** Returns the cached offset buffer memory address for direct access. */ + public long getOffsetBufferAddress() { + return offsetBufferAddress; + } + + /** Returns the wrapped data column vector for the array elements. */ + public ColumnVector getDataColumnVector() { + return dataColumnVector; } @Override public ColumnarArray getArray(int i) { if (isNullAt(i)) return null; - int start = listVector.getOffsetBuffer().getInt(i * ListVector.OFFSET_WIDTH); - int end = listVector.getOffsetBuffer().getInt((i + 1) * ListVector.OFFSET_WIDTH); + int start = Platform.getInt(null, offsetBufferAddress + (long) i * ListVector.OFFSET_WIDTH); + int end = Platform.getInt(null, offsetBufferAddress + (long) (i + 1) * ListVector.OFFSET_WIDTH); return new ColumnarArray(dataColumnVector, start, end - start); } diff --git a/common/src/main/java/org/apache/comet/vector/CometMapVector.java b/common/src/main/java/org/apache/comet/vector/CometMapVector.java index c5984a4dcb..107fc2294b 100644 --- a/common/src/main/java/org/apache/comet/vector/CometMapVector.java +++ b/common/src/main/java/org/apache/comet/vector/CometMapVector.java @@ -26,6 +26,7 @@ import org.apache.arrow.vector.util.TransferPair; import org.apache.spark.sql.vectorized.ColumnVector; import org.apache.spark.sql.vectorized.ColumnarMap; +import org.apache.spark.unsafe.Platform; /** A Comet column vector for map type. */ public class CometMapVector extends CometDecodedVector { @@ -33,6 +34,7 @@ public class CometMapVector extends CometDecodedVector { final ValueVector dataVector; final CometStructVector dataColumnVector; final DictionaryProvider dictionaryProvider; + final long offsetBufferAddress; final ColumnVector keys; final ColumnVector values; @@ -44,6 +46,7 @@ public CometMapVector( this.mapVector = ((MapVector) vector); this.dataVector = mapVector.getDataVector(); this.dictionaryProvider = dictionaryProvider; + this.offsetBufferAddress = mapVector.getOffsetBuffer().memoryAddress(); if (dataVector instanceof StructVector) { this.dataColumnVector = new CometStructVector(dataVector, useDecimal128, dictionaryProvider); @@ -63,11 +66,26 @@ public CometMapVector( } } + /** Returns the cached offset buffer memory address for direct access. */ + public long getOffsetBufferAddress() { + return offsetBufferAddress; + } + + /** Returns the wrapped column vector for map keys. */ + public ColumnVector getKeysVector() { + return keys; + } + + /** Returns the wrapped column vector for map values. */ + public ColumnVector getValuesVector() { + return values; + } + @Override public ColumnarMap getMap(int i) { if (isNullAt(i)) return null; - int start = mapVector.getOffsetBuffer().getInt(i * MapVector.OFFSET_WIDTH); - int end = mapVector.getOffsetBuffer().getInt((i + 1) * MapVector.OFFSET_WIDTH); + int start = Platform.getInt(null, offsetBufferAddress + (long) i * MapVector.OFFSET_WIDTH); + int end = Platform.getInt(null, offsetBufferAddress + (long) (i + 1) * MapVector.OFFSET_WIDTH); return new ColumnarMap(keys, values, start, end - start); } diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometColumnarToRowExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometColumnarToRowExec.scala index d965a6ff7b..ec4ed5e846 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometColumnarToRowExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometColumnarToRowExec.scala @@ -45,7 +45,7 @@ import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector} import org.apache.spark.util.{SparkFatalException, Utils} import org.apache.spark.util.io.ChunkedByteBuffer -import org.apache.comet.vector.CometPlainVector +import org.apache.comet.vector.{CometListVector, CometMapVector, CometPlainVector} /** * Copied from Spark `ColumnarToRowExec`. Comet needs the fix for SPARK-50235 but cannot wait for @@ -209,6 +209,107 @@ case class CometColumnarToRowExec(child: SparkPlan) ExprCode(code, isNullVar, JavaCode.variable(valueVar, dataType)) } + /** + * Generate optimized code for ArrayType columns using Comet's direct memory access. This caches + * the offset buffer address and data vector per-batch to avoid repeated method calls per-row. + */ + private def genCodeForCometArray( + ctx: CodegenContext, + columnVar: String, + ordinal: String, + offsetAddrVar: String, + dataColVar: String, + dataType: DataType, + nullable: Boolean): ExprCode = { + val columnarArrayClz = "org.apache.spark.sql.vectorized.ColumnarArray" + val platformClz = "org.apache.spark.unsafe.Platform" + + val isNullVar = if (nullable) { + JavaCode.isNullVariable(ctx.freshName("isNull")) + } else { + FalseLiteral + } + val valueVar = ctx.freshName("value") + val startVar = ctx.freshName("start") + val endVar = ctx.freshName("end") + val lenVar = ctx.freshName("len") + + val str = s"cometArrayVector[$columnVar, $ordinal]" + // scalastyle:off line.size.limit + val code = code"${ctx.registerComment(str)}" + (if (nullable) { + code""" + boolean $isNullVar = $columnVar.isNullAt($ordinal); + $columnarArrayClz $valueVar = null; + if (!$isNullVar) { + int $startVar = $platformClz.getInt(null, $offsetAddrVar + (long) $ordinal * 4L); + int $endVar = $platformClz.getInt(null, $offsetAddrVar + (long) ($ordinal + 1) * 4L); + int $lenVar = $endVar - $startVar; + $valueVar = new $columnarArrayClz($dataColVar, $startVar, $lenVar); + } + """ + } else { + code""" + int $startVar = $platformClz.getInt(null, $offsetAddrVar + (long) $ordinal * 4L); + int $endVar = $platformClz.getInt(null, $offsetAddrVar + (long) ($ordinal + 1) * 4L); + int $lenVar = $endVar - $startVar; + $columnarArrayClz $valueVar = new $columnarArrayClz($dataColVar, $startVar, $lenVar); + """ + }) + // scalastyle:on line.size.limit + ExprCode(code, isNullVar, JavaCode.variable(valueVar, dataType)) + } + + /** + * Generate optimized code for MapType columns using Comet's direct memory access. This caches + * the offset buffer address, keys vector, and values vector per-batch. + */ + private def genCodeForCometMap( + ctx: CodegenContext, + columnVar: String, + ordinal: String, + offsetAddrVar: String, + keysColVar: String, + valuesColVar: String, + dataType: DataType, + nullable: Boolean): ExprCode = { + val columnarMapClz = "org.apache.spark.sql.vectorized.ColumnarMap" + val platformClz = "org.apache.spark.unsafe.Platform" + + val isNullVar = if (nullable) { + JavaCode.isNullVariable(ctx.freshName("isNull")) + } else { + FalseLiteral + } + val valueVar = ctx.freshName("value") + val startVar = ctx.freshName("start") + val endVar = ctx.freshName("end") + val lenVar = ctx.freshName("len") + + val str = s"cometMapVector[$columnVar, $ordinal]" + // scalastyle:off line.size.limit + val code = code"${ctx.registerComment(str)}" + (if (nullable) { + code""" + boolean $isNullVar = $columnVar.isNullAt($ordinal); + $columnarMapClz $valueVar = null; + if (!$isNullVar) { + int $startVar = $platformClz.getInt(null, $offsetAddrVar + (long) $ordinal * 4L); + int $endVar = $platformClz.getInt(null, $offsetAddrVar + (long) ($ordinal + 1) * 4L); + int $lenVar = $endVar - $startVar; + $valueVar = new $columnarMapClz($keysColVar, $valuesColVar, $startVar, $lenVar); + } + """ + } else { + code""" + int $startVar = $platformClz.getInt(null, $offsetAddrVar + (long) $ordinal * 4L); + int $endVar = $platformClz.getInt(null, $offsetAddrVar + (long) ($ordinal + 1) * 4L); + int $lenVar = $endVar - $startVar; + $columnarMapClz $valueVar = new $columnarMapClz($keysColVar, $valuesColVar, $startVar, $lenVar); + """ + }) + // scalastyle:on line.size.limit + ExprCode(code, isNullVar, JavaCode.variable(valueVar, dataType)) + } + /** * Produce code to process the input iterator as [[ColumnarBatch]]es. This produces an * [[org.apache.spark.sql.catalyst.expressions.UnsafeRow]] for each row in each batch. @@ -227,11 +328,72 @@ case class CometColumnarToRowExec(child: SparkPlan) val idx = ctx.addMutableState(CodeGenerator.JAVA_INT, "batchIdx") // init as batchIdx = 0 val columnVectorClzs = child.vectorTypes.getOrElse(Seq.fill(output.indices.size)(classOf[ColumnVector].getName)) - val (colVars, columnAssigns) = columnVectorClzs.zipWithIndex.map { - case (columnVectorClz, i) => - val name = ctx.addMutableState(columnVectorClz, s"colInstance$i") - (name, s"$name = ($columnVectorClz) $batch.column($i);") - }.unzip + val columnVectorClz = classOf[ColumnVector].getName + val cometListVectorClz = classOf[CometListVector].getName + val cometMapVectorClz = classOf[CometMapVector].getName + + // For each column, create mutable state and assignment code. + // For ArrayType and MapType, also create cached state for offset addresses and child vectors. + case class ColumnInfo( + colVar: String, + assignCode: String, + dataType: DataType, + nullable: Boolean, + // For ArrayType: (offsetAddrVar, dataColVar) + arrayInfo: Option[(String, String)] = None, + // For MapType: (offsetAddrVar, keysColVar, valuesColVar) + mapInfo: Option[(String, String, String)] = None) + + val columnInfos = output.zipWithIndex.map { case (attr, i) => + val colVarName = ctx.addMutableState(columnVectorClzs(i), s"colInstance$i") + val baseAssign = s"$colVarName = (${columnVectorClzs(i)}) $batch.column($i);" + + attr.dataType match { + case _: ArrayType => + val offsetAddrVar = ctx.addMutableState("long", s"arrayOffsetAddr$i") + val dataColVar = ctx.addMutableState(columnVectorClz, s"arrayDataCol$i") + val extraAssign = + s""" + |if ($colVarName instanceof $cometListVectorClz) { + | $cometListVectorClz cometList$i = ($cometListVectorClz) $colVarName; + | $offsetAddrVar = cometList$i.getOffsetBufferAddress(); + | $dataColVar = cometList$i.getDataColumnVector(); + |} + """.stripMargin + ColumnInfo( + colVarName, + baseAssign + extraAssign, + attr.dataType, + attr.nullable, + arrayInfo = Some((offsetAddrVar, dataColVar))) + + case _: MapType => + val offsetAddrVar = ctx.addMutableState("long", s"mapOffsetAddr$i") + val keysColVar = ctx.addMutableState(columnVectorClz, s"mapKeysCol$i") + val valuesColVar = ctx.addMutableState(columnVectorClz, s"mapValuesCol$i") + val extraAssign = + s""" + |if ($colVarName instanceof $cometMapVectorClz) { + | $cometMapVectorClz cometMap$i = ($cometMapVectorClz) $colVarName; + | $offsetAddrVar = cometMap$i.getOffsetBufferAddress(); + | $keysColVar = cometMap$i.getKeysVector(); + | $valuesColVar = cometMap$i.getValuesVector(); + |} + """.stripMargin + ColumnInfo( + colVarName, + baseAssign + extraAssign, + attr.dataType, + attr.nullable, + mapInfo = Some((offsetAddrVar, keysColVar, valuesColVar))) + + case _ => + ColumnInfo(colVarName, baseAssign, attr.dataType, attr.nullable) + } + } + + val colVars = columnInfos.map(_.colVar) + val columnAssigns = columnInfos.map(_.assignCode) val nextBatch = ctx.freshName("nextBatch") val nextBatchFuncName = ctx.addNewFunction( @@ -249,8 +411,33 @@ case class CometColumnarToRowExec(child: SparkPlan) ctx.currentVars = null val rowidx = ctx.freshName("rowIdx") - val columnsBatchInput = (output zip colVars).map { case (attr, colVar) => - genCodeColumnVector(ctx, colVar, rowidx, attr.dataType, attr.nullable) + val columnsBatchInput = columnInfos.map { info => + (info.arrayInfo, info.mapInfo) match { + case (Some((offsetAddrVar, dataColVar)), _) => + // Use optimized code generation for ArrayType + genCodeForCometArray( + ctx, + info.colVar, + rowidx, + offsetAddrVar, + dataColVar, + info.dataType, + info.nullable) + case (_, Some((offsetAddrVar, keysColVar, valuesColVar))) => + // Use optimized code generation for MapType + genCodeForCometMap( + ctx, + info.colVar, + rowidx, + offsetAddrVar, + keysColVar, + valuesColVar, + info.dataType, + info.nullable) + case _ => + // Use standard code generation for other types + genCodeColumnVector(ctx, info.colVar, rowidx, info.dataType, info.nullable) + } } val localIdx = ctx.freshName("localIdx") val localEnd = ctx.freshName("localEnd") From 0d45b8f114b86ff04178363cffbfc5d500d0ab1b Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sat, 17 Jan 2026 19:51:22 -0700 Subject: [PATCH 3/9] perf: add reusable wrapper objects to eliminate per-row allocation Phase 3 of CometColumnarToRow optimization: - Add CometColumnarArray: mutable ArrayData impl that allows updating offset/length without creating new objects - Add CometColumnarMap: mutable MapData impl with same reusability - Update code generation to create wrapper once per batch and reuse across all rows via update() calls This eliminates object allocation per-row for array and map columns, reducing GC pressure. Co-Authored-By: Claude Opus 4.5 --- .../comet/vector/CometColumnarArray.java | 186 ++++++++++++++++++ .../apache/comet/vector/CometColumnarMap.java | 75 +++++++ .../sql/comet/CometColumnarToRowExec.scala | 81 ++++---- 3 files changed, 309 insertions(+), 33 deletions(-) create mode 100644 common/src/main/java/org/apache/comet/vector/CometColumnarArray.java create mode 100644 common/src/main/java/org/apache/comet/vector/CometColumnarMap.java diff --git a/common/src/main/java/org/apache/comet/vector/CometColumnarArray.java b/common/src/main/java/org/apache/comet/vector/CometColumnarArray.java new file mode 100644 index 0000000000..c370c82273 --- /dev/null +++ b/common/src/main/java/org/apache/comet/vector/CometColumnarArray.java @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet.vector; + +import org.apache.spark.sql.catalyst.util.ArrayData; +import org.apache.spark.sql.catalyst.util.GenericArrayData; +import org.apache.spark.sql.types.DataType; +import org.apache.spark.sql.types.Decimal; +import org.apache.spark.sql.vectorized.ColumnVector; +import org.apache.spark.unsafe.types.CalendarInterval; +import org.apache.spark.unsafe.types.UTF8String; + +/** + * A mutable implementation of ArrayData backed by a ColumnVector. Unlike Spark's ColumnarArray + * which has final fields, this class allows updating the offset and length to enable object reuse + * across rows, reducing GC pressure. + */ +public class CometColumnarArray extends ArrayData { + private ColumnVector data; + private int offset; + private int length; + + public CometColumnarArray(ColumnVector data) { + this.data = data; + this.offset = 0; + this.length = 0; + } + + public CometColumnarArray(ColumnVector data, int offset, int length) { + this.data = data; + this.offset = offset; + this.length = length; + } + + /** Updates this array to point to a new slice of the underlying data. */ + public void update(int offset, int length) { + this.offset = offset; + this.length = length; + } + + /** Updates both the data vector and the slice. */ + public void update(ColumnVector data, int offset, int length) { + this.data = data; + this.offset = offset; + this.length = length; + } + + @Override + public int numElements() { + return length; + } + + @Override + public ArrayData copy() { + Object[] values = new Object[length]; + for (int i = 0; i < length; i++) { + if (!isNullAt(i)) { + values[i] = get(i, data.dataType()); + } + } + return new GenericArrayData(values); + } + + @Override + public Object[] array() { + DataType dt = data.dataType(); + Object[] values = new Object[length]; + for (int i = 0; i < length; i++) { + if (!isNullAt(i)) { + values[i] = get(i, dt); + } + } + return values; + } + + @Override + public void setNullAt(int i) { + throw new UnsupportedOperationException("CometColumnarArray is read-only"); + } + + @Override + public void update(int i, Object value) { + throw new UnsupportedOperationException("CometColumnarArray is read-only"); + } + + @Override + public boolean isNullAt(int ordinal) { + return data.isNullAt(offset + ordinal); + } + + @Override + public boolean getBoolean(int ordinal) { + return data.getBoolean(offset + ordinal); + } + + @Override + public byte getByte(int ordinal) { + return data.getByte(offset + ordinal); + } + + @Override + public short getShort(int ordinal) { + return data.getShort(offset + ordinal); + } + + @Override + public int getInt(int ordinal) { + return data.getInt(offset + ordinal); + } + + @Override + public long getLong(int ordinal) { + return data.getLong(offset + ordinal); + } + + @Override + public float getFloat(int ordinal) { + return data.getFloat(offset + ordinal); + } + + @Override + public double getDouble(int ordinal) { + return data.getDouble(offset + ordinal); + } + + @Override + public Decimal getDecimal(int ordinal, int precision, int scale) { + return data.getDecimal(offset + ordinal, precision, scale); + } + + @Override + public UTF8String getUTF8String(int ordinal) { + return data.getUTF8String(offset + ordinal); + } + + @Override + public byte[] getBinary(int ordinal) { + return data.getBinary(offset + ordinal); + } + + @Override + public CalendarInterval getInterval(int ordinal) { + return data.getInterval(offset + ordinal); + } + + @Override + public ArrayData getArray(int ordinal) { + return data.getArray(offset + ordinal); + } + + @Override + public org.apache.spark.sql.catalyst.util.MapData getMap(int ordinal) { + return data.getMap(offset + ordinal); + } + + @Override + public org.apache.spark.sql.catalyst.InternalRow getStruct(int ordinal, int numFields) { + return data.getStruct(offset + ordinal); + } + + @Override + public Object get(int ordinal, DataType dataType) { + if (isNullAt(ordinal)) { + return null; + } + return org.apache.spark.sql.catalyst.expressions.SpecializedGettersReader.read( + this, ordinal, dataType, true, true); + } +} diff --git a/common/src/main/java/org/apache/comet/vector/CometColumnarMap.java b/common/src/main/java/org/apache/comet/vector/CometColumnarMap.java new file mode 100644 index 0000000000..45847cba40 --- /dev/null +++ b/common/src/main/java/org/apache/comet/vector/CometColumnarMap.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet.vector; + +import org.apache.spark.sql.catalyst.util.ArrayBasedMapData; +import org.apache.spark.sql.catalyst.util.ArrayData; +import org.apache.spark.sql.catalyst.util.MapData; +import org.apache.spark.sql.vectorized.ColumnVector; + +/** + * A mutable implementation of MapData backed by ColumnVectors. Unlike Spark's ColumnarMap which has + * final fields, this class allows updating the offset and length to enable object reuse across + * rows, reducing GC pressure. + */ +public class CometColumnarMap extends MapData { + private final CometColumnarArray keys; + private final CometColumnarArray values; + private int length; + + public CometColumnarMap(ColumnVector keysData, ColumnVector valuesData) { + this.keys = new CometColumnarArray(keysData); + this.values = new CometColumnarArray(valuesData); + this.length = 0; + } + + public CometColumnarMap(ColumnVector keysData, ColumnVector valuesData, int offset, int length) { + this.keys = new CometColumnarArray(keysData, offset, length); + this.values = new CometColumnarArray(valuesData, offset, length); + this.length = length; + } + + /** Updates this map to point to a new slice of the underlying data. */ + public void update(int offset, int length) { + this.keys.update(offset, length); + this.values.update(offset, length); + this.length = length; + } + + @Override + public int numElements() { + return length; + } + + @Override + public ArrayData keyArray() { + return keys; + } + + @Override + public ArrayData valueArray() { + return values; + } + + @Override + public MapData copy() { + return new ArrayBasedMapData(keys.copy(), values.copy()); + } +} diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometColumnarToRowExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometColumnarToRowExec.scala index ec4ed5e846..0e1c6600c1 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometColumnarToRowExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometColumnarToRowExec.scala @@ -45,7 +45,7 @@ import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector} import org.apache.spark.util.{SparkFatalException, Utils} import org.apache.spark.util.io.ChunkedByteBuffer -import org.apache.comet.vector.{CometListVector, CometMapVector, CometPlainVector} +import org.apache.comet.vector.{CometColumnarArray, CometColumnarMap, CometListVector, CometMapVector, CometPlainVector} /** * Copied from Spark `ColumnarToRowExec`. Comet needs the fix for SPARK-50235 but cannot wait for @@ -211,17 +211,18 @@ case class CometColumnarToRowExec(child: SparkPlan) /** * Generate optimized code for ArrayType columns using Comet's direct memory access. This caches - * the offset buffer address and data vector per-batch to avoid repeated method calls per-row. + * the offset buffer address and data vector per-batch, and reuses a CometColumnarArray instance + * to avoid object allocation per-row. */ private def genCodeForCometArray( ctx: CodegenContext, columnVar: String, ordinal: String, offsetAddrVar: String, - dataColVar: String, + reusableArrayVar: String, dataType: DataType, nullable: Boolean): ExprCode = { - val columnarArrayClz = "org.apache.spark.sql.vectorized.ColumnarArray" + val cometArrayClz = classOf[CometColumnarArray].getName val platformClz = "org.apache.spark.unsafe.Platform" val isNullVar = if (nullable) { @@ -239,12 +240,13 @@ case class CometColumnarToRowExec(child: SparkPlan) val code = code"${ctx.registerComment(str)}" + (if (nullable) { code""" boolean $isNullVar = $columnVar.isNullAt($ordinal); - $columnarArrayClz $valueVar = null; + $cometArrayClz $valueVar = null; if (!$isNullVar) { int $startVar = $platformClz.getInt(null, $offsetAddrVar + (long) $ordinal * 4L); int $endVar = $platformClz.getInt(null, $offsetAddrVar + (long) ($ordinal + 1) * 4L); int $lenVar = $endVar - $startVar; - $valueVar = new $columnarArrayClz($dataColVar, $startVar, $lenVar); + $reusableArrayVar.update($startVar, $lenVar); + $valueVar = $reusableArrayVar; } """ } else { @@ -252,7 +254,8 @@ case class CometColumnarToRowExec(child: SparkPlan) int $startVar = $platformClz.getInt(null, $offsetAddrVar + (long) $ordinal * 4L); int $endVar = $platformClz.getInt(null, $offsetAddrVar + (long) ($ordinal + 1) * 4L); int $lenVar = $endVar - $startVar; - $columnarArrayClz $valueVar = new $columnarArrayClz($dataColVar, $startVar, $lenVar); + $reusableArrayVar.update($startVar, $lenVar); + $cometArrayClz $valueVar = $reusableArrayVar; """ }) // scalastyle:on line.size.limit @@ -261,18 +264,18 @@ case class CometColumnarToRowExec(child: SparkPlan) /** * Generate optimized code for MapType columns using Comet's direct memory access. This caches - * the offset buffer address, keys vector, and values vector per-batch. + * the offset buffer address, keys vector, and values vector per-batch, and reuses a + * CometColumnarMap instance to avoid object allocation per-row. */ private def genCodeForCometMap( ctx: CodegenContext, columnVar: String, ordinal: String, offsetAddrVar: String, - keysColVar: String, - valuesColVar: String, + reusableMapVar: String, dataType: DataType, nullable: Boolean): ExprCode = { - val columnarMapClz = "org.apache.spark.sql.vectorized.ColumnarMap" + val cometMapClz = classOf[CometColumnarMap].getName val platformClz = "org.apache.spark.unsafe.Platform" val isNullVar = if (nullable) { @@ -290,12 +293,13 @@ case class CometColumnarToRowExec(child: SparkPlan) val code = code"${ctx.registerComment(str)}" + (if (nullable) { code""" boolean $isNullVar = $columnVar.isNullAt($ordinal); - $columnarMapClz $valueVar = null; + $cometMapClz $valueVar = null; if (!$isNullVar) { int $startVar = $platformClz.getInt(null, $offsetAddrVar + (long) $ordinal * 4L); int $endVar = $platformClz.getInt(null, $offsetAddrVar + (long) ($ordinal + 1) * 4L); int $lenVar = $endVar - $startVar; - $valueVar = new $columnarMapClz($keysColVar, $valuesColVar, $startVar, $lenVar); + $reusableMapVar.update($startVar, $lenVar); + $valueVar = $reusableMapVar; } """ } else { @@ -303,7 +307,8 @@ case class CometColumnarToRowExec(child: SparkPlan) int $startVar = $platformClz.getInt(null, $offsetAddrVar + (long) $ordinal * 4L); int $endVar = $platformClz.getInt(null, $offsetAddrVar + (long) ($ordinal + 1) * 4L); int $lenVar = $endVar - $startVar; - $columnarMapClz $valueVar = new $columnarMapClz($keysColVar, $valuesColVar, $startVar, $lenVar); + $reusableMapVar.update($startVar, $lenVar); + $cometMapClz $valueVar = $reusableMapVar; """ }) // scalastyle:on line.size.limit @@ -331,18 +336,21 @@ case class CometColumnarToRowExec(child: SparkPlan) val columnVectorClz = classOf[ColumnVector].getName val cometListVectorClz = classOf[CometListVector].getName val cometMapVectorClz = classOf[CometMapVector].getName + val cometArrayClz = classOf[CometColumnarArray].getName + val cometMapClz = classOf[CometColumnarMap].getName // For each column, create mutable state and assignment code. - // For ArrayType and MapType, also create cached state for offset addresses and child vectors. + // For ArrayType and MapType, also create cached state for offset addresses, child vectors, + // and reusable wrapper objects. case class ColumnInfo( colVar: String, assignCode: String, dataType: DataType, nullable: Boolean, - // For ArrayType: (offsetAddrVar, dataColVar) + // For ArrayType: (offsetAddrVar, reusableArrayVar) arrayInfo: Option[(String, String)] = None, - // For MapType: (offsetAddrVar, keysColVar, valuesColVar) - mapInfo: Option[(String, String, String)] = None) + // For MapType: (offsetAddrVar, reusableMapVar) + mapInfo: Option[(String, String)] = None) val columnInfos = output.zipWithIndex.map { case (attr, i) => val colVarName = ctx.addMutableState(columnVectorClzs(i), s"colInstance$i") @@ -351,41 +359,49 @@ case class CometColumnarToRowExec(child: SparkPlan) attr.dataType match { case _: ArrayType => val offsetAddrVar = ctx.addMutableState("long", s"arrayOffsetAddr$i") - val dataColVar = ctx.addMutableState(columnVectorClz, s"arrayDataCol$i") + val dataColVar = ctx.freshName(s"arrayDataCol$i") + val reusableArrayVar = ctx.addMutableState(cometArrayClz, s"reusableArray$i") + // scalastyle:off line.size.limit val extraAssign = s""" |if ($colVarName instanceof $cometListVectorClz) { | $cometListVectorClz cometList$i = ($cometListVectorClz) $colVarName; | $offsetAddrVar = cometList$i.getOffsetBufferAddress(); - | $dataColVar = cometList$i.getDataColumnVector(); + | $columnVectorClz $dataColVar = cometList$i.getDataColumnVector(); + | $reusableArrayVar = new $cometArrayClz($dataColVar); |} """.stripMargin + // scalastyle:on line.size.limit ColumnInfo( colVarName, baseAssign + extraAssign, attr.dataType, attr.nullable, - arrayInfo = Some((offsetAddrVar, dataColVar))) + arrayInfo = Some((offsetAddrVar, reusableArrayVar))) case _: MapType => val offsetAddrVar = ctx.addMutableState("long", s"mapOffsetAddr$i") - val keysColVar = ctx.addMutableState(columnVectorClz, s"mapKeysCol$i") - val valuesColVar = ctx.addMutableState(columnVectorClz, s"mapValuesCol$i") + val keysColVar = ctx.freshName(s"mapKeysCol$i") + val valuesColVar = ctx.freshName(s"mapValuesCol$i") + val reusableMapVar = ctx.addMutableState(cometMapClz, s"reusableMap$i") + // scalastyle:off line.size.limit val extraAssign = s""" |if ($colVarName instanceof $cometMapVectorClz) { | $cometMapVectorClz cometMap$i = ($cometMapVectorClz) $colVarName; | $offsetAddrVar = cometMap$i.getOffsetBufferAddress(); - | $keysColVar = cometMap$i.getKeysVector(); - | $valuesColVar = cometMap$i.getValuesVector(); + | $columnVectorClz $keysColVar = cometMap$i.getKeysVector(); + | $columnVectorClz $valuesColVar = cometMap$i.getValuesVector(); + | $reusableMapVar = new $cometMapClz($keysColVar, $valuesColVar); |} """.stripMargin + // scalastyle:on line.size.limit ColumnInfo( colVarName, baseAssign + extraAssign, attr.dataType, attr.nullable, - mapInfo = Some((offsetAddrVar, keysColVar, valuesColVar))) + mapInfo = Some((offsetAddrVar, reusableMapVar))) case _ => ColumnInfo(colVarName, baseAssign, attr.dataType, attr.nullable) @@ -413,25 +429,24 @@ case class CometColumnarToRowExec(child: SparkPlan) val rowidx = ctx.freshName("rowIdx") val columnsBatchInput = columnInfos.map { info => (info.arrayInfo, info.mapInfo) match { - case (Some((offsetAddrVar, dataColVar)), _) => - // Use optimized code generation for ArrayType + case (Some((offsetAddrVar, reusableArrayVar)), _) => + // Use optimized code generation for ArrayType with reusable wrapper genCodeForCometArray( ctx, info.colVar, rowidx, offsetAddrVar, - dataColVar, + reusableArrayVar, info.dataType, info.nullable) - case (_, Some((offsetAddrVar, keysColVar, valuesColVar))) => - // Use optimized code generation for MapType + case (_, Some((offsetAddrVar, reusableMapVar))) => + // Use optimized code generation for MapType with reusable wrapper genCodeForCometMap( ctx, info.colVar, rowidx, offsetAddrVar, - keysColVar, - valuesColVar, + reusableMapVar, info.dataType, info.nullable) case _ => From 8333c124312b93d9b84542e48ff24fbdf7afa471 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sat, 17 Jan 2026 20:02:09 -0700 Subject: [PATCH 4/9] perf: add reusable row wrapper for StructType columns Phase 4 of CometColumnarToRow optimization: - Add CometColumnarRow: mutable InternalRow impl that allows updating the rowId without creating new objects - Update code generation to detect StructType columns and use CometColumnarRow wrappers created once per batch This eliminates object allocation per-row for struct columns, reducing GC pressure. Co-Authored-By: Claude Opus 4.5 --- .../apache/comet/vector/CometColumnarRow.java | 177 ++++++++++++++++++ .../sql/comet/CometColumnarToRowExec.scala | 83 +++++++- 2 files changed, 253 insertions(+), 7 deletions(-) create mode 100644 common/src/main/java/org/apache/comet/vector/CometColumnarRow.java diff --git a/common/src/main/java/org/apache/comet/vector/CometColumnarRow.java b/common/src/main/java/org/apache/comet/vector/CometColumnarRow.java new file mode 100644 index 0000000000..9e85a3f6b6 --- /dev/null +++ b/common/src/main/java/org/apache/comet/vector/CometColumnarRow.java @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet.vector; + +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow; +import org.apache.spark.sql.catalyst.util.ArrayData; +import org.apache.spark.sql.catalyst.util.MapData; +import org.apache.spark.sql.types.DataType; +import org.apache.spark.sql.types.Decimal; +import org.apache.spark.sql.types.StructType; +import org.apache.spark.sql.vectorized.ColumnVector; +import org.apache.spark.unsafe.types.CalendarInterval; +import org.apache.spark.unsafe.types.UTF8String; + +/** + * A mutable implementation of InternalRow backed by a ColumnVector for struct types. Unlike Spark's + * ColumnarRow which has final fields, this class allows updating the rowId to enable object reuse + * across rows, reducing GC pressure. + */ +public class CometColumnarRow extends InternalRow { + private ColumnVector data; + private int rowId; + private int numFields; + + public CometColumnarRow(ColumnVector data) { + this.data = data; + this.rowId = 0; + this.numFields = ((StructType) data.dataType()).size(); + } + + public CometColumnarRow(ColumnVector data, int rowId) { + this.data = data; + this.rowId = rowId; + this.numFields = ((StructType) data.dataType()).size(); + } + + /** Updates this row to point to a different row in the underlying data. */ + public void update(int rowId) { + this.rowId = rowId; + } + + @Override + public int numFields() { + return numFields; + } + + @Override + public void setNullAt(int i) { + throw new UnsupportedOperationException("CometColumnarRow is read-only"); + } + + @Override + public void update(int i, Object value) { + throw new UnsupportedOperationException("CometColumnarRow is read-only"); + } + + @Override + public InternalRow copy() { + GenericInternalRow row = new GenericInternalRow(numFields); + for (int i = 0; i < numFields; i++) { + if (isNullAt(i)) { + row.setNullAt(i); + } else { + DataType dt = data.getChild(i).dataType(); + row.update(i, get(i, dt)); + } + } + return row; + } + + @Override + public boolean anyNull() { + throw new UnsupportedOperationException(); + } + + @Override + public boolean isNullAt(int ordinal) { + return data.getChild(ordinal).isNullAt(rowId); + } + + @Override + public boolean getBoolean(int ordinal) { + return data.getChild(ordinal).getBoolean(rowId); + } + + @Override + public byte getByte(int ordinal) { + return data.getChild(ordinal).getByte(rowId); + } + + @Override + public short getShort(int ordinal) { + return data.getChild(ordinal).getShort(rowId); + } + + @Override + public int getInt(int ordinal) { + return data.getChild(ordinal).getInt(rowId); + } + + @Override + public long getLong(int ordinal) { + return data.getChild(ordinal).getLong(rowId); + } + + @Override + public float getFloat(int ordinal) { + return data.getChild(ordinal).getFloat(rowId); + } + + @Override + public double getDouble(int ordinal) { + return data.getChild(ordinal).getDouble(rowId); + } + + @Override + public Decimal getDecimal(int ordinal, int precision, int scale) { + return data.getChild(ordinal).getDecimal(rowId, precision, scale); + } + + @Override + public UTF8String getUTF8String(int ordinal) { + return data.getChild(ordinal).getUTF8String(rowId); + } + + @Override + public byte[] getBinary(int ordinal) { + return data.getChild(ordinal).getBinary(rowId); + } + + @Override + public CalendarInterval getInterval(int ordinal) { + return data.getChild(ordinal).getInterval(rowId); + } + + @Override + public InternalRow getStruct(int ordinal, int numFields) { + return data.getChild(ordinal).getStruct(rowId); + } + + @Override + public ArrayData getArray(int ordinal) { + return data.getChild(ordinal).getArray(rowId); + } + + @Override + public MapData getMap(int ordinal) { + return data.getChild(ordinal).getMap(rowId); + } + + @Override + public Object get(int ordinal, DataType dataType) { + if (isNullAt(ordinal)) { + return null; + } + return org.apache.spark.sql.catalyst.expressions.SpecializedGettersReader.read( + this, ordinal, dataType, true, true); + } +} diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometColumnarToRowExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometColumnarToRowExec.scala index 0e1c6600c1..4d48b7eda0 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometColumnarToRowExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometColumnarToRowExec.scala @@ -45,7 +45,7 @@ import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector} import org.apache.spark.util.{SparkFatalException, Utils} import org.apache.spark.util.io.ChunkedByteBuffer -import org.apache.comet.vector.{CometColumnarArray, CometColumnarMap, CometListVector, CometMapVector, CometPlainVector} +import org.apache.comet.vector.{CometColumnarArray, CometColumnarMap, CometColumnarRow, CometListVector, CometMapVector, CometPlainVector, CometStructVector} /** * Copied from Spark `ColumnarToRowExec`. Comet needs the fix for SPARK-50235 but cannot wait for @@ -315,6 +315,45 @@ case class CometColumnarToRowExec(child: SparkPlan) ExprCode(code, isNullVar, JavaCode.variable(valueVar, dataType)) } + /** + * Generate optimized code for StructType columns using a reusable CometColumnarRow. This avoids + * creating a new ColumnarRow object per-row. + */ + private def genCodeForCometStruct( + ctx: CodegenContext, + columnVar: String, + ordinal: String, + reusableRowVar: String, + dataType: DataType, + nullable: Boolean): ExprCode = { + val cometRowClz = classOf[CometColumnarRow].getName + + val isNullVar = if (nullable) { + JavaCode.isNullVariable(ctx.freshName("isNull")) + } else { + FalseLiteral + } + val valueVar = ctx.freshName("value") + + val str = s"cometStructVector[$columnVar, $ordinal]" + val code = code"${ctx.registerComment(str)}" + (if (nullable) { + code""" + boolean $isNullVar = $columnVar.isNullAt($ordinal); + $cometRowClz $valueVar = null; + if (!$isNullVar) { + $reusableRowVar.update($ordinal); + $valueVar = $reusableRowVar; + } + """ + } else { + code""" + $reusableRowVar.update($ordinal); + $cometRowClz $valueVar = $reusableRowVar; + """ + }) + ExprCode(code, isNullVar, JavaCode.variable(valueVar, dataType)) + } + /** * Produce code to process the input iterator as [[ColumnarBatch]]es. This produces an * [[org.apache.spark.sql.catalyst.expressions.UnsafeRow]] for each row in each batch. @@ -336,12 +375,14 @@ case class CometColumnarToRowExec(child: SparkPlan) val columnVectorClz = classOf[ColumnVector].getName val cometListVectorClz = classOf[CometListVector].getName val cometMapVectorClz = classOf[CometMapVector].getName + val cometStructVectorClz = classOf[CometStructVector].getName val cometArrayClz = classOf[CometColumnarArray].getName val cometMapClz = classOf[CometColumnarMap].getName + val cometRowClz = classOf[CometColumnarRow].getName // For each column, create mutable state and assignment code. - // For ArrayType and MapType, also create cached state for offset addresses, child vectors, - // and reusable wrapper objects. + // For ArrayType, MapType, and StructType, also create cached state for offset addresses, + // child vectors, and reusable wrapper objects. case class ColumnInfo( colVar: String, assignCode: String, @@ -350,7 +391,9 @@ case class CometColumnarToRowExec(child: SparkPlan) // For ArrayType: (offsetAddrVar, reusableArrayVar) arrayInfo: Option[(String, String)] = None, // For MapType: (offsetAddrVar, reusableMapVar) - mapInfo: Option[(String, String)] = None) + mapInfo: Option[(String, String)] = None, + // For StructType: reusableRowVar + structInfo: Option[String] = None) val columnInfos = output.zipWithIndex.map { case (attr, i) => val colVarName = ctx.addMutableState(columnVectorClzs(i), s"colInstance$i") @@ -403,6 +446,23 @@ case class CometColumnarToRowExec(child: SparkPlan) attr.nullable, mapInfo = Some((offsetAddrVar, reusableMapVar))) + case _: StructType => + val reusableRowVar = ctx.addMutableState(cometRowClz, s"reusableRow$i") + // scalastyle:off line.size.limit + val extraAssign = + s""" + |if ($colVarName instanceof $cometStructVectorClz) { + | $reusableRowVar = new $cometRowClz($colVarName); + |} + """.stripMargin + // scalastyle:on line.size.limit + ColumnInfo( + colVarName, + baseAssign + extraAssign, + attr.dataType, + attr.nullable, + structInfo = Some(reusableRowVar)) + case _ => ColumnInfo(colVarName, baseAssign, attr.dataType, attr.nullable) } @@ -428,8 +488,8 @@ case class CometColumnarToRowExec(child: SparkPlan) ctx.currentVars = null val rowidx = ctx.freshName("rowIdx") val columnsBatchInput = columnInfos.map { info => - (info.arrayInfo, info.mapInfo) match { - case (Some((offsetAddrVar, reusableArrayVar)), _) => + (info.arrayInfo, info.mapInfo, info.structInfo) match { + case (Some((offsetAddrVar, reusableArrayVar)), _, _) => // Use optimized code generation for ArrayType with reusable wrapper genCodeForCometArray( ctx, @@ -439,7 +499,7 @@ case class CometColumnarToRowExec(child: SparkPlan) reusableArrayVar, info.dataType, info.nullable) - case (_, Some((offsetAddrVar, reusableMapVar))) => + case (_, Some((offsetAddrVar, reusableMapVar)), _) => // Use optimized code generation for MapType with reusable wrapper genCodeForCometMap( ctx, @@ -449,6 +509,15 @@ case class CometColumnarToRowExec(child: SparkPlan) reusableMapVar, info.dataType, info.nullable) + case (_, _, Some(reusableRowVar)) => + // Use optimized code generation for StructType with reusable wrapper + genCodeForCometStruct( + ctx, + info.colVar, + rowidx, + reusableRowVar, + info.dataType, + info.nullable) case _ => // Use standard code generation for other types genCodeColumnVector(ctx, info.colVar, rowidx, info.dataType, info.nullable) From d3eb1f703ea757451e203586254be9c20fee3575 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sat, 17 Jan 2026 20:30:08 -0700 Subject: [PATCH 5/9] fix: add getVariant() method for Spark 4.x compatibility Add getVariant() method to CometColumnarArray and CometColumnarRow without @Override annotation to maintain compatibility with both Spark 3.x (which doesn't have this method) and Spark 4.x (which requires it). Co-Authored-By: Claude Opus 4.5 --- .../java/org/apache/comet/vector/CometColumnarArray.java | 5 +++++ .../main/java/org/apache/comet/vector/CometColumnarRow.java | 5 +++++ 2 files changed, 10 insertions(+) diff --git a/common/src/main/java/org/apache/comet/vector/CometColumnarArray.java b/common/src/main/java/org/apache/comet/vector/CometColumnarArray.java index c370c82273..748a29bf22 100644 --- a/common/src/main/java/org/apache/comet/vector/CometColumnarArray.java +++ b/common/src/main/java/org/apache/comet/vector/CometColumnarArray.java @@ -175,6 +175,11 @@ public org.apache.spark.sql.catalyst.InternalRow getStruct(int ordinal, int numF return data.getStruct(offset + ordinal); } + // Note: getVariant is added in Spark 4.x, no @Override to maintain Spark 3.x compatibility + public Object getVariant(int ordinal) { + throw new UnsupportedOperationException("Variant type is not supported"); + } + @Override public Object get(int ordinal, DataType dataType) { if (isNullAt(ordinal)) { diff --git a/common/src/main/java/org/apache/comet/vector/CometColumnarRow.java b/common/src/main/java/org/apache/comet/vector/CometColumnarRow.java index 9e85a3f6b6..3ea6a3e0f6 100644 --- a/common/src/main/java/org/apache/comet/vector/CometColumnarRow.java +++ b/common/src/main/java/org/apache/comet/vector/CometColumnarRow.java @@ -166,6 +166,11 @@ public MapData getMap(int ordinal) { return data.getChild(ordinal).getMap(rowId); } + // Note: getVariant is added in Spark 4.x, no @Override to maintain Spark 3.x compatibility + public Object getVariant(int ordinal) { + throw new UnsupportedOperationException("Variant type is not supported"); + } + @Override public Object get(int ordinal, DataType dataType) { if (isNullAt(ordinal)) { From a18e0e1576022b4c214e7d00131176a20b3dde41 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sat, 17 Jan 2026 22:57:03 -0700 Subject: [PATCH 6/9] fix: move CometColumnarArray and CometColumnarRow to version-specific dirs Move these classes to spark-3.x and spark-4.0 directories to handle the getVariant() method difference: - Spark 3.x: SpecializedGetters doesn't have getVariant() - Spark 4.x: SpecializedGetters.getVariant() returns VariantVal This fixes the CI build failures for Spark 4.x. Co-Authored-By: Claude Opus 4.5 --- .../comet/vector/CometColumnarArray.java | 186 ++++++++++++++++++ .../apache/comet/vector/CometColumnarRow.java | 177 +++++++++++++++++ .../comet/vector/CometColumnarArray.java | 5 +- .../apache/comet/vector/CometColumnarRow.java | 5 +- 4 files changed, 369 insertions(+), 4 deletions(-) create mode 100644 common/src/main/spark-3.x/org/apache/comet/vector/CometColumnarArray.java create mode 100644 common/src/main/spark-3.x/org/apache/comet/vector/CometColumnarRow.java rename common/src/main/{java => spark-4.0}/org/apache/comet/vector/CometColumnarArray.java (97%) rename common/src/main/{java => spark-4.0}/org/apache/comet/vector/CometColumnarRow.java (97%) diff --git a/common/src/main/spark-3.x/org/apache/comet/vector/CometColumnarArray.java b/common/src/main/spark-3.x/org/apache/comet/vector/CometColumnarArray.java new file mode 100644 index 0000000000..c370c82273 --- /dev/null +++ b/common/src/main/spark-3.x/org/apache/comet/vector/CometColumnarArray.java @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet.vector; + +import org.apache.spark.sql.catalyst.util.ArrayData; +import org.apache.spark.sql.catalyst.util.GenericArrayData; +import org.apache.spark.sql.types.DataType; +import org.apache.spark.sql.types.Decimal; +import org.apache.spark.sql.vectorized.ColumnVector; +import org.apache.spark.unsafe.types.CalendarInterval; +import org.apache.spark.unsafe.types.UTF8String; + +/** + * A mutable implementation of ArrayData backed by a ColumnVector. Unlike Spark's ColumnarArray + * which has final fields, this class allows updating the offset and length to enable object reuse + * across rows, reducing GC pressure. + */ +public class CometColumnarArray extends ArrayData { + private ColumnVector data; + private int offset; + private int length; + + public CometColumnarArray(ColumnVector data) { + this.data = data; + this.offset = 0; + this.length = 0; + } + + public CometColumnarArray(ColumnVector data, int offset, int length) { + this.data = data; + this.offset = offset; + this.length = length; + } + + /** Updates this array to point to a new slice of the underlying data. */ + public void update(int offset, int length) { + this.offset = offset; + this.length = length; + } + + /** Updates both the data vector and the slice. */ + public void update(ColumnVector data, int offset, int length) { + this.data = data; + this.offset = offset; + this.length = length; + } + + @Override + public int numElements() { + return length; + } + + @Override + public ArrayData copy() { + Object[] values = new Object[length]; + for (int i = 0; i < length; i++) { + if (!isNullAt(i)) { + values[i] = get(i, data.dataType()); + } + } + return new GenericArrayData(values); + } + + @Override + public Object[] array() { + DataType dt = data.dataType(); + Object[] values = new Object[length]; + for (int i = 0; i < length; i++) { + if (!isNullAt(i)) { + values[i] = get(i, dt); + } + } + return values; + } + + @Override + public void setNullAt(int i) { + throw new UnsupportedOperationException("CometColumnarArray is read-only"); + } + + @Override + public void update(int i, Object value) { + throw new UnsupportedOperationException("CometColumnarArray is read-only"); + } + + @Override + public boolean isNullAt(int ordinal) { + return data.isNullAt(offset + ordinal); + } + + @Override + public boolean getBoolean(int ordinal) { + return data.getBoolean(offset + ordinal); + } + + @Override + public byte getByte(int ordinal) { + return data.getByte(offset + ordinal); + } + + @Override + public short getShort(int ordinal) { + return data.getShort(offset + ordinal); + } + + @Override + public int getInt(int ordinal) { + return data.getInt(offset + ordinal); + } + + @Override + public long getLong(int ordinal) { + return data.getLong(offset + ordinal); + } + + @Override + public float getFloat(int ordinal) { + return data.getFloat(offset + ordinal); + } + + @Override + public double getDouble(int ordinal) { + return data.getDouble(offset + ordinal); + } + + @Override + public Decimal getDecimal(int ordinal, int precision, int scale) { + return data.getDecimal(offset + ordinal, precision, scale); + } + + @Override + public UTF8String getUTF8String(int ordinal) { + return data.getUTF8String(offset + ordinal); + } + + @Override + public byte[] getBinary(int ordinal) { + return data.getBinary(offset + ordinal); + } + + @Override + public CalendarInterval getInterval(int ordinal) { + return data.getInterval(offset + ordinal); + } + + @Override + public ArrayData getArray(int ordinal) { + return data.getArray(offset + ordinal); + } + + @Override + public org.apache.spark.sql.catalyst.util.MapData getMap(int ordinal) { + return data.getMap(offset + ordinal); + } + + @Override + public org.apache.spark.sql.catalyst.InternalRow getStruct(int ordinal, int numFields) { + return data.getStruct(offset + ordinal); + } + + @Override + public Object get(int ordinal, DataType dataType) { + if (isNullAt(ordinal)) { + return null; + } + return org.apache.spark.sql.catalyst.expressions.SpecializedGettersReader.read( + this, ordinal, dataType, true, true); + } +} diff --git a/common/src/main/spark-3.x/org/apache/comet/vector/CometColumnarRow.java b/common/src/main/spark-3.x/org/apache/comet/vector/CometColumnarRow.java new file mode 100644 index 0000000000..9e85a3f6b6 --- /dev/null +++ b/common/src/main/spark-3.x/org/apache/comet/vector/CometColumnarRow.java @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet.vector; + +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow; +import org.apache.spark.sql.catalyst.util.ArrayData; +import org.apache.spark.sql.catalyst.util.MapData; +import org.apache.spark.sql.types.DataType; +import org.apache.spark.sql.types.Decimal; +import org.apache.spark.sql.types.StructType; +import org.apache.spark.sql.vectorized.ColumnVector; +import org.apache.spark.unsafe.types.CalendarInterval; +import org.apache.spark.unsafe.types.UTF8String; + +/** + * A mutable implementation of InternalRow backed by a ColumnVector for struct types. Unlike Spark's + * ColumnarRow which has final fields, this class allows updating the rowId to enable object reuse + * across rows, reducing GC pressure. + */ +public class CometColumnarRow extends InternalRow { + private ColumnVector data; + private int rowId; + private int numFields; + + public CometColumnarRow(ColumnVector data) { + this.data = data; + this.rowId = 0; + this.numFields = ((StructType) data.dataType()).size(); + } + + public CometColumnarRow(ColumnVector data, int rowId) { + this.data = data; + this.rowId = rowId; + this.numFields = ((StructType) data.dataType()).size(); + } + + /** Updates this row to point to a different row in the underlying data. */ + public void update(int rowId) { + this.rowId = rowId; + } + + @Override + public int numFields() { + return numFields; + } + + @Override + public void setNullAt(int i) { + throw new UnsupportedOperationException("CometColumnarRow is read-only"); + } + + @Override + public void update(int i, Object value) { + throw new UnsupportedOperationException("CometColumnarRow is read-only"); + } + + @Override + public InternalRow copy() { + GenericInternalRow row = new GenericInternalRow(numFields); + for (int i = 0; i < numFields; i++) { + if (isNullAt(i)) { + row.setNullAt(i); + } else { + DataType dt = data.getChild(i).dataType(); + row.update(i, get(i, dt)); + } + } + return row; + } + + @Override + public boolean anyNull() { + throw new UnsupportedOperationException(); + } + + @Override + public boolean isNullAt(int ordinal) { + return data.getChild(ordinal).isNullAt(rowId); + } + + @Override + public boolean getBoolean(int ordinal) { + return data.getChild(ordinal).getBoolean(rowId); + } + + @Override + public byte getByte(int ordinal) { + return data.getChild(ordinal).getByte(rowId); + } + + @Override + public short getShort(int ordinal) { + return data.getChild(ordinal).getShort(rowId); + } + + @Override + public int getInt(int ordinal) { + return data.getChild(ordinal).getInt(rowId); + } + + @Override + public long getLong(int ordinal) { + return data.getChild(ordinal).getLong(rowId); + } + + @Override + public float getFloat(int ordinal) { + return data.getChild(ordinal).getFloat(rowId); + } + + @Override + public double getDouble(int ordinal) { + return data.getChild(ordinal).getDouble(rowId); + } + + @Override + public Decimal getDecimal(int ordinal, int precision, int scale) { + return data.getChild(ordinal).getDecimal(rowId, precision, scale); + } + + @Override + public UTF8String getUTF8String(int ordinal) { + return data.getChild(ordinal).getUTF8String(rowId); + } + + @Override + public byte[] getBinary(int ordinal) { + return data.getChild(ordinal).getBinary(rowId); + } + + @Override + public CalendarInterval getInterval(int ordinal) { + return data.getChild(ordinal).getInterval(rowId); + } + + @Override + public InternalRow getStruct(int ordinal, int numFields) { + return data.getChild(ordinal).getStruct(rowId); + } + + @Override + public ArrayData getArray(int ordinal) { + return data.getChild(ordinal).getArray(rowId); + } + + @Override + public MapData getMap(int ordinal) { + return data.getChild(ordinal).getMap(rowId); + } + + @Override + public Object get(int ordinal, DataType dataType) { + if (isNullAt(ordinal)) { + return null; + } + return org.apache.spark.sql.catalyst.expressions.SpecializedGettersReader.read( + this, ordinal, dataType, true, true); + } +} diff --git a/common/src/main/java/org/apache/comet/vector/CometColumnarArray.java b/common/src/main/spark-4.0/org/apache/comet/vector/CometColumnarArray.java similarity index 97% rename from common/src/main/java/org/apache/comet/vector/CometColumnarArray.java rename to common/src/main/spark-4.0/org/apache/comet/vector/CometColumnarArray.java index 748a29bf22..15980a55c8 100644 --- a/common/src/main/java/org/apache/comet/vector/CometColumnarArray.java +++ b/common/src/main/spark-4.0/org/apache/comet/vector/CometColumnarArray.java @@ -26,6 +26,7 @@ import org.apache.spark.sql.vectorized.ColumnVector; import org.apache.spark.unsafe.types.CalendarInterval; import org.apache.spark.unsafe.types.UTF8String; +import org.apache.spark.unsafe.types.VariantVal; /** * A mutable implementation of ArrayData backed by a ColumnVector. Unlike Spark's ColumnarArray @@ -175,8 +176,8 @@ public org.apache.spark.sql.catalyst.InternalRow getStruct(int ordinal, int numF return data.getStruct(offset + ordinal); } - // Note: getVariant is added in Spark 4.x, no @Override to maintain Spark 3.x compatibility - public Object getVariant(int ordinal) { + @Override + public VariantVal getVariant(int ordinal) { throw new UnsupportedOperationException("Variant type is not supported"); } diff --git a/common/src/main/java/org/apache/comet/vector/CometColumnarRow.java b/common/src/main/spark-4.0/org/apache/comet/vector/CometColumnarRow.java similarity index 97% rename from common/src/main/java/org/apache/comet/vector/CometColumnarRow.java rename to common/src/main/spark-4.0/org/apache/comet/vector/CometColumnarRow.java index 3ea6a3e0f6..fbae928887 100644 --- a/common/src/main/java/org/apache/comet/vector/CometColumnarRow.java +++ b/common/src/main/spark-4.0/org/apache/comet/vector/CometColumnarRow.java @@ -29,6 +29,7 @@ import org.apache.spark.sql.vectorized.ColumnVector; import org.apache.spark.unsafe.types.CalendarInterval; import org.apache.spark.unsafe.types.UTF8String; +import org.apache.spark.unsafe.types.VariantVal; /** * A mutable implementation of InternalRow backed by a ColumnVector for struct types. Unlike Spark's @@ -166,8 +167,8 @@ public MapData getMap(int ordinal) { return data.getChild(ordinal).getMap(rowId); } - // Note: getVariant is added in Spark 4.x, no @Override to maintain Spark 3.x compatibility - public Object getVariant(int ordinal) { + @Override + public VariantVal getVariant(int ordinal) { throw new UnsupportedOperationException("Variant type is not supported"); } From 92cc31cc651ef161e2c874d8ac3edc57786d01a8 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sun, 18 Jan 2026 08:59:56 -0700 Subject: [PATCH 7/9] perf: add direct memory access for fixed-width primitives Use Platform.getXxx() for direct memory access to value buffers for fixed-width primitive types (byte, short, int, long, float, double, date, timestamp). This caches the value buffer address per-batch and uses JIT-intrinsified memory operations. Co-Authored-By: Claude Opus 4.5 --- .../apache/comet/vector/CometPlainVector.java | 13 +++ .../sql/comet/CometColumnarToRowExec.scala | 98 +++++++++++++++++-- 2 files changed, 104 insertions(+), 7 deletions(-) diff --git a/common/src/main/java/org/apache/comet/vector/CometPlainVector.java b/common/src/main/java/org/apache/comet/vector/CometPlainVector.java index 2a30be1b1c..6155772cd7 100644 --- a/common/src/main/java/org/apache/comet/vector/CometPlainVector.java +++ b/common/src/main/java/org/apache/comet/vector/CometPlainVector.java @@ -70,6 +70,19 @@ public void setReused(boolean isReused) { this.isReused = isReused; } + /** Returns the cached value buffer memory address for direct access. */ + public long getValueBufferAddress() { + return valueBufferAddress; + } + + /** Returns the element size in bytes for fixed-width types, or -1 for variable-width. */ + public int getElementSize() { + if (valueVector instanceof BaseFixedWidthVector) { + return ((BaseFixedWidthVector) valueVector).getTypeWidth(); + } + return -1; + } + @Override public void setNumNulls(int numNulls) { super.setNumNulls(numNulls); diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometColumnarToRowExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometColumnarToRowExec.scala index 4d48b7eda0..f58eaba0d6 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometColumnarToRowExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometColumnarToRowExec.scala @@ -354,6 +354,61 @@ case class CometColumnarToRowExec(child: SparkPlan) ExprCode(code, isNullVar, JavaCode.variable(valueVar, dataType)) } + /** + * Generate optimized code for fixed-width primitive types using direct memory access. This + * caches the value buffer address per-batch and uses Platform.getXxx() for direct reads. + */ + private def genCodeForFixedWidth( + ctx: CodegenContext, + columnVar: String, + ordinal: String, + valueBufferAddrVar: String, + dataType: DataType, + nullable: Boolean): ExprCode = { + val platformClz = "org.apache.spark.unsafe.Platform" + val javaType = CodeGenerator.javaType(dataType) + + val isNullVar = if (nullable) { + JavaCode.isNullVariable(ctx.freshName("isNull")) + } else { + FalseLiteral + } + val valueVar = ctx.freshName("value") + + // Determine the Platform method and element size based on data type + val (platformMethod, elementSize) = dataType match { + case ByteType => ("getByte", 1) + case ShortType => ("getShort", 2) + case IntegerType | DateType => ("getInt", 4) + case LongType | TimestampType => ("getLong", 8) + case FloatType => ("getFloat", 4) + case DoubleType => ("getDouble", 8) + case _ => throw new IllegalArgumentException(s"Unsupported fixed-width type: $dataType") + } + + val str = s"fixedWidthVector[$columnVar, $ordinal, ${dataType.simpleString}]" + val addrExpr = s"$valueBufferAddrVar + (long) $ordinal * ${elementSize}L" + val readExpr = s"$platformClz.$platformMethod(null, $addrExpr)" + val code = code"${ctx.registerComment(str)}" + (if (nullable) { + code""" + boolean $isNullVar = $columnVar.isNullAt($ordinal); + $javaType $valueVar = $isNullVar ? + ${CodeGenerator.defaultValue(dataType)} : $readExpr; + """ + } else { + code"$javaType $valueVar = $readExpr;" + }) + ExprCode(code, isNullVar, JavaCode.variable(valueVar, dataType)) + } + + /** Check if a data type is a fixed-width primitive that can use direct memory access. */ + private def isFixedWidthPrimitive(dataType: DataType): Boolean = dataType match { + case ByteType | ShortType | IntegerType | LongType => true + case FloatType | DoubleType => true + case DateType | TimestampType => true + case _ => false + } + /** * Produce code to process the input iterator as [[ColumnarBatch]]es. This produces an * [[org.apache.spark.sql.catalyst.expressions.UnsafeRow]] for each row in each batch. @@ -379,10 +434,11 @@ case class CometColumnarToRowExec(child: SparkPlan) val cometArrayClz = classOf[CometColumnarArray].getName val cometMapClz = classOf[CometColumnarMap].getName val cometRowClz = classOf[CometColumnarRow].getName + val cometPlainVectorClz = classOf[CometPlainVector].getName // For each column, create mutable state and assignment code. - // For ArrayType, MapType, and StructType, also create cached state for offset addresses, - // child vectors, and reusable wrapper objects. + // For ArrayType, MapType, StructType, and fixed-width primitives, also create cached state + // for offset addresses, child vectors, value buffer addresses, and reusable wrapper objects. case class ColumnInfo( colVar: String, assignCode: String, @@ -393,7 +449,9 @@ case class CometColumnarToRowExec(child: SparkPlan) // For MapType: (offsetAddrVar, reusableMapVar) mapInfo: Option[(String, String)] = None, // For StructType: reusableRowVar - structInfo: Option[String] = None) + structInfo: Option[String] = None, + // For fixed-width primitives: valueBufferAddrVar + fixedWidthInfo: Option[String] = None) val columnInfos = output.zipWithIndex.map { case (attr, i) => val colVarName = ctx.addMutableState(columnVectorClzs(i), s"colInstance$i") @@ -463,6 +521,23 @@ case class CometColumnarToRowExec(child: SparkPlan) attr.nullable, structInfo = Some(reusableRowVar)) + case dt if isFixedWidthPrimitive(dt) => + val valueBufferAddrVar = ctx.addMutableState("long", s"valueBufferAddr$i") + // scalastyle:off line.size.limit + val extraAssign = + s""" + |if ($colVarName instanceof $cometPlainVectorClz) { + | $valueBufferAddrVar = (($cometPlainVectorClz) $colVarName).getValueBufferAddress(); + |} + """.stripMargin + // scalastyle:on line.size.limit + ColumnInfo( + colVarName, + baseAssign + extraAssign, + attr.dataType, + attr.nullable, + fixedWidthInfo = Some(valueBufferAddrVar)) + case _ => ColumnInfo(colVarName, baseAssign, attr.dataType, attr.nullable) } @@ -488,8 +563,8 @@ case class CometColumnarToRowExec(child: SparkPlan) ctx.currentVars = null val rowidx = ctx.freshName("rowIdx") val columnsBatchInput = columnInfos.map { info => - (info.arrayInfo, info.mapInfo, info.structInfo) match { - case (Some((offsetAddrVar, reusableArrayVar)), _, _) => + (info.arrayInfo, info.mapInfo, info.structInfo, info.fixedWidthInfo) match { + case (Some((offsetAddrVar, reusableArrayVar)), _, _, _) => // Use optimized code generation for ArrayType with reusable wrapper genCodeForCometArray( ctx, @@ -499,7 +574,7 @@ case class CometColumnarToRowExec(child: SparkPlan) reusableArrayVar, info.dataType, info.nullable) - case (_, Some((offsetAddrVar, reusableMapVar)), _) => + case (_, Some((offsetAddrVar, reusableMapVar)), _, _) => // Use optimized code generation for MapType with reusable wrapper genCodeForCometMap( ctx, @@ -509,7 +584,7 @@ case class CometColumnarToRowExec(child: SparkPlan) reusableMapVar, info.dataType, info.nullable) - case (_, _, Some(reusableRowVar)) => + case (_, _, Some(reusableRowVar), _) => // Use optimized code generation for StructType with reusable wrapper genCodeForCometStruct( ctx, @@ -518,6 +593,15 @@ case class CometColumnarToRowExec(child: SparkPlan) reusableRowVar, info.dataType, info.nullable) + case (_, _, _, Some(valueBufferAddrVar)) => + // Use optimized code generation for fixed-width primitives with direct memory access + genCodeForFixedWidth( + ctx, + info.colVar, + rowidx, + valueBufferAddrVar, + info.dataType, + info.nullable) case _ => // Use standard code generation for other types genCodeColumnVector(ctx, info.colVar, rowidx, info.dataType, info.nullable) From 564bf614299cd20f6f4332f08a672d5ef8e1170d Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sun, 18 Jan 2026 09:16:50 -0700 Subject: [PATCH 8/9] perf: add wrapper pooling for nested complex types When accessing nested arrays, maps, or structs from within complex type wrappers (CometColumnarArray, CometColumnarRow), reuse wrapper objects instead of creating new ones per-element. This eliminates per-element allocations for deeply nested types like Array(Array(Int)) or Struct(Array, Map). Co-Authored-By: Claude Opus 4.5 --- .../comet/vector/CometColumnarArray.java | 66 +++++++++++++++++-- .../apache/comet/vector/CometColumnarRow.java | 61 ++++++++++++++++- .../comet/vector/CometColumnarArray.java | 66 +++++++++++++++++-- .../apache/comet/vector/CometColumnarRow.java | 61 ++++++++++++++++- 4 files changed, 236 insertions(+), 18 deletions(-) diff --git a/common/src/main/spark-3.x/org/apache/comet/vector/CometColumnarArray.java b/common/src/main/spark-3.x/org/apache/comet/vector/CometColumnarArray.java index c370c82273..9879a8dca7 100644 --- a/common/src/main/spark-3.x/org/apache/comet/vector/CometColumnarArray.java +++ b/common/src/main/spark-3.x/org/apache/comet/vector/CometColumnarArray.java @@ -19,11 +19,14 @@ package org.apache.comet.vector; +import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.catalyst.util.ArrayData; import org.apache.spark.sql.catalyst.util.GenericArrayData; +import org.apache.spark.sql.catalyst.util.MapData; import org.apache.spark.sql.types.DataType; import org.apache.spark.sql.types.Decimal; import org.apache.spark.sql.vectorized.ColumnVector; +import org.apache.spark.unsafe.Platform; import org.apache.spark.unsafe.types.CalendarInterval; import org.apache.spark.unsafe.types.UTF8String; @@ -31,12 +34,20 @@ * A mutable implementation of ArrayData backed by a ColumnVector. Unlike Spark's ColumnarArray * which has final fields, this class allows updating the offset and length to enable object reuse * across rows, reducing GC pressure. + * + *

This class also implements wrapper pooling for nested complex types (arrays, maps, structs) to + * avoid per-element allocations when accessing nested data. */ public class CometColumnarArray extends ArrayData { private ColumnVector data; private int offset; private int length; + // Reusable wrappers for nested complex types to avoid per-element allocations + private CometColumnarArray reusableNestedArray; + private CometColumnarMap reusableNestedMap; + private CometColumnarRow reusableNestedRow; + public CometColumnarArray(ColumnVector data) { this.data = data; this.offset = 0; @@ -55,8 +66,14 @@ public void update(int offset, int length) { this.length = length; } - /** Updates both the data vector and the slice. */ + /** Updates both the data vector and the slice. Resets nested wrappers when data changes. */ public void update(ColumnVector data, int offset, int length) { + if (this.data != data) { + // Reset reusable wrappers when underlying data changes + this.reusableNestedArray = null; + this.reusableNestedMap = null; + this.reusableNestedRow = null; + } this.data = data; this.offset = offset; this.length = length; @@ -162,17 +179,54 @@ public CalendarInterval getInterval(int ordinal) { @Override public ArrayData getArray(int ordinal) { - return data.getArray(offset + ordinal); + int idx = offset + ordinal; + if (data instanceof CometListVector) { + CometListVector listVector = (CometListVector) data; + long offsetAddr = listVector.getOffsetBufferAddress(); + int start = Platform.getInt(null, offsetAddr + (long) idx * 4L); + int end = Platform.getInt(null, offsetAddr + (long) (idx + 1) * 4L); + int len = end - start; + + if (reusableNestedArray == null) { + reusableNestedArray = new CometColumnarArray(listVector.getDataColumnVector()); + } + reusableNestedArray.update(start, len); + return reusableNestedArray; + } + return data.getArray(idx); } @Override - public org.apache.spark.sql.catalyst.util.MapData getMap(int ordinal) { - return data.getMap(offset + ordinal); + public MapData getMap(int ordinal) { + int idx = offset + ordinal; + if (data instanceof CometMapVector) { + CometMapVector mapVector = (CometMapVector) data; + long offsetAddr = mapVector.getOffsetBufferAddress(); + int start = Platform.getInt(null, offsetAddr + (long) idx * 4L); + int end = Platform.getInt(null, offsetAddr + (long) (idx + 1) * 4L); + int len = end - start; + + if (reusableNestedMap == null) { + reusableNestedMap = + new CometColumnarMap(mapVector.getKeysVector(), mapVector.getValuesVector()); + } + reusableNestedMap.update(start, len); + return reusableNestedMap; + } + return data.getMap(idx); } @Override - public org.apache.spark.sql.catalyst.InternalRow getStruct(int ordinal, int numFields) { - return data.getStruct(offset + ordinal); + public InternalRow getStruct(int ordinal, int numFields) { + int idx = offset + ordinal; + if (data instanceof CometStructVector) { + if (reusableNestedRow == null) { + reusableNestedRow = new CometColumnarRow(data); + } + reusableNestedRow.update(idx); + return reusableNestedRow; + } + return data.getStruct(idx); } @Override diff --git a/common/src/main/spark-3.x/org/apache/comet/vector/CometColumnarRow.java b/common/src/main/spark-3.x/org/apache/comet/vector/CometColumnarRow.java index 9e85a3f6b6..608d624a77 100644 --- a/common/src/main/spark-3.x/org/apache/comet/vector/CometColumnarRow.java +++ b/common/src/main/spark-3.x/org/apache/comet/vector/CometColumnarRow.java @@ -27,6 +27,7 @@ import org.apache.spark.sql.types.Decimal; import org.apache.spark.sql.types.StructType; import org.apache.spark.sql.vectorized.ColumnVector; +import org.apache.spark.unsafe.Platform; import org.apache.spark.unsafe.types.CalendarInterval; import org.apache.spark.unsafe.types.UTF8String; @@ -34,12 +35,20 @@ * A mutable implementation of InternalRow backed by a ColumnVector for struct types. Unlike Spark's * ColumnarRow which has final fields, this class allows updating the rowId to enable object reuse * across rows, reducing GC pressure. + * + *

This class also implements wrapper pooling for nested complex types (arrays, maps, structs) to + * avoid per-field allocations when accessing nested data. */ public class CometColumnarRow extends InternalRow { private ColumnVector data; private int rowId; private int numFields; + // Reusable wrappers for nested complex types, indexed by field ordinal + private CometColumnarArray[] reusableNestedArrays; + private CometColumnarMap[] reusableNestedMaps; + private CometColumnarRow[] reusableNestedRows; + public CometColumnarRow(ColumnVector data) { this.data = data; this.rowId = 0; @@ -153,17 +162,63 @@ public CalendarInterval getInterval(int ordinal) { @Override public InternalRow getStruct(int ordinal, int numFields) { - return data.getChild(ordinal).getStruct(rowId); + ColumnVector child = data.getChild(ordinal); + if (child instanceof CometStructVector) { + if (reusableNestedRows == null) { + reusableNestedRows = new CometColumnarRow[this.numFields]; + } + if (reusableNestedRows[ordinal] == null) { + reusableNestedRows[ordinal] = new CometColumnarRow(child); + } + reusableNestedRows[ordinal].update(rowId); + return reusableNestedRows[ordinal]; + } + return child.getStruct(rowId); } @Override public ArrayData getArray(int ordinal) { - return data.getChild(ordinal).getArray(rowId); + ColumnVector child = data.getChild(ordinal); + if (child instanceof CometListVector) { + CometListVector listVector = (CometListVector) child; + long offsetAddr = listVector.getOffsetBufferAddress(); + int start = Platform.getInt(null, offsetAddr + (long) rowId * 4L); + int end = Platform.getInt(null, offsetAddr + (long) (rowId + 1) * 4L); + int len = end - start; + + if (reusableNestedArrays == null) { + reusableNestedArrays = new CometColumnarArray[numFields]; + } + if (reusableNestedArrays[ordinal] == null) { + reusableNestedArrays[ordinal] = new CometColumnarArray(listVector.getDataColumnVector()); + } + reusableNestedArrays[ordinal].update(start, len); + return reusableNestedArrays[ordinal]; + } + return child.getArray(rowId); } @Override public MapData getMap(int ordinal) { - return data.getChild(ordinal).getMap(rowId); + ColumnVector child = data.getChild(ordinal); + if (child instanceof CometMapVector) { + CometMapVector mapVector = (CometMapVector) child; + long offsetAddr = mapVector.getOffsetBufferAddress(); + int start = Platform.getInt(null, offsetAddr + (long) rowId * 4L); + int end = Platform.getInt(null, offsetAddr + (long) (rowId + 1) * 4L); + int len = end - start; + + if (reusableNestedMaps == null) { + reusableNestedMaps = new CometColumnarMap[numFields]; + } + if (reusableNestedMaps[ordinal] == null) { + reusableNestedMaps[ordinal] = + new CometColumnarMap(mapVector.getKeysVector(), mapVector.getValuesVector()); + } + reusableNestedMaps[ordinal].update(start, len); + return reusableNestedMaps[ordinal]; + } + return child.getMap(rowId); } @Override diff --git a/common/src/main/spark-4.0/org/apache/comet/vector/CometColumnarArray.java b/common/src/main/spark-4.0/org/apache/comet/vector/CometColumnarArray.java index 15980a55c8..f7d8fffbab 100644 --- a/common/src/main/spark-4.0/org/apache/comet/vector/CometColumnarArray.java +++ b/common/src/main/spark-4.0/org/apache/comet/vector/CometColumnarArray.java @@ -19,11 +19,14 @@ package org.apache.comet.vector; +import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.catalyst.util.ArrayData; import org.apache.spark.sql.catalyst.util.GenericArrayData; +import org.apache.spark.sql.catalyst.util.MapData; import org.apache.spark.sql.types.DataType; import org.apache.spark.sql.types.Decimal; import org.apache.spark.sql.vectorized.ColumnVector; +import org.apache.spark.unsafe.Platform; import org.apache.spark.unsafe.types.CalendarInterval; import org.apache.spark.unsafe.types.UTF8String; import org.apache.spark.unsafe.types.VariantVal; @@ -32,12 +35,20 @@ * A mutable implementation of ArrayData backed by a ColumnVector. Unlike Spark's ColumnarArray * which has final fields, this class allows updating the offset and length to enable object reuse * across rows, reducing GC pressure. + * + *

This class also implements wrapper pooling for nested complex types (arrays, maps, structs) to + * avoid per-element allocations when accessing nested data. */ public class CometColumnarArray extends ArrayData { private ColumnVector data; private int offset; private int length; + // Reusable wrappers for nested complex types to avoid per-element allocations + private CometColumnarArray reusableNestedArray; + private CometColumnarMap reusableNestedMap; + private CometColumnarRow reusableNestedRow; + public CometColumnarArray(ColumnVector data) { this.data = data; this.offset = 0; @@ -56,8 +67,14 @@ public void update(int offset, int length) { this.length = length; } - /** Updates both the data vector and the slice. */ + /** Updates both the data vector and the slice. Resets nested wrappers when data changes. */ public void update(ColumnVector data, int offset, int length) { + if (this.data != data) { + // Reset reusable wrappers when underlying data changes + this.reusableNestedArray = null; + this.reusableNestedMap = null; + this.reusableNestedRow = null; + } this.data = data; this.offset = offset; this.length = length; @@ -163,17 +180,54 @@ public CalendarInterval getInterval(int ordinal) { @Override public ArrayData getArray(int ordinal) { - return data.getArray(offset + ordinal); + int idx = offset + ordinal; + if (data instanceof CometListVector) { + CometListVector listVector = (CometListVector) data; + long offsetAddr = listVector.getOffsetBufferAddress(); + int start = Platform.getInt(null, offsetAddr + (long) idx * 4L); + int end = Platform.getInt(null, offsetAddr + (long) (idx + 1) * 4L); + int len = end - start; + + if (reusableNestedArray == null) { + reusableNestedArray = new CometColumnarArray(listVector.getDataColumnVector()); + } + reusableNestedArray.update(start, len); + return reusableNestedArray; + } + return data.getArray(idx); } @Override - public org.apache.spark.sql.catalyst.util.MapData getMap(int ordinal) { - return data.getMap(offset + ordinal); + public MapData getMap(int ordinal) { + int idx = offset + ordinal; + if (data instanceof CometMapVector) { + CometMapVector mapVector = (CometMapVector) data; + long offsetAddr = mapVector.getOffsetBufferAddress(); + int start = Platform.getInt(null, offsetAddr + (long) idx * 4L); + int end = Platform.getInt(null, offsetAddr + (long) (idx + 1) * 4L); + int len = end - start; + + if (reusableNestedMap == null) { + reusableNestedMap = + new CometColumnarMap(mapVector.getKeysVector(), mapVector.getValuesVector()); + } + reusableNestedMap.update(start, len); + return reusableNestedMap; + } + return data.getMap(idx); } @Override - public org.apache.spark.sql.catalyst.InternalRow getStruct(int ordinal, int numFields) { - return data.getStruct(offset + ordinal); + public InternalRow getStruct(int ordinal, int numFields) { + int idx = offset + ordinal; + if (data instanceof CometStructVector) { + if (reusableNestedRow == null) { + reusableNestedRow = new CometColumnarRow(data); + } + reusableNestedRow.update(idx); + return reusableNestedRow; + } + return data.getStruct(idx); } @Override diff --git a/common/src/main/spark-4.0/org/apache/comet/vector/CometColumnarRow.java b/common/src/main/spark-4.0/org/apache/comet/vector/CometColumnarRow.java index fbae928887..e6358381ea 100644 --- a/common/src/main/spark-4.0/org/apache/comet/vector/CometColumnarRow.java +++ b/common/src/main/spark-4.0/org/apache/comet/vector/CometColumnarRow.java @@ -27,6 +27,7 @@ import org.apache.spark.sql.types.Decimal; import org.apache.spark.sql.types.StructType; import org.apache.spark.sql.vectorized.ColumnVector; +import org.apache.spark.unsafe.Platform; import org.apache.spark.unsafe.types.CalendarInterval; import org.apache.spark.unsafe.types.UTF8String; import org.apache.spark.unsafe.types.VariantVal; @@ -35,12 +36,20 @@ * A mutable implementation of InternalRow backed by a ColumnVector for struct types. Unlike Spark's * ColumnarRow which has final fields, this class allows updating the rowId to enable object reuse * across rows, reducing GC pressure. + * + *

This class also implements wrapper pooling for nested complex types (arrays, maps, structs) to + * avoid per-field allocations when accessing nested data. */ public class CometColumnarRow extends InternalRow { private ColumnVector data; private int rowId; private int numFields; + // Reusable wrappers for nested complex types, indexed by field ordinal + private CometColumnarArray[] reusableNestedArrays; + private CometColumnarMap[] reusableNestedMaps; + private CometColumnarRow[] reusableNestedRows; + public CometColumnarRow(ColumnVector data) { this.data = data; this.rowId = 0; @@ -154,17 +163,63 @@ public CalendarInterval getInterval(int ordinal) { @Override public InternalRow getStruct(int ordinal, int numFields) { - return data.getChild(ordinal).getStruct(rowId); + ColumnVector child = data.getChild(ordinal); + if (child instanceof CometStructVector) { + if (reusableNestedRows == null) { + reusableNestedRows = new CometColumnarRow[this.numFields]; + } + if (reusableNestedRows[ordinal] == null) { + reusableNestedRows[ordinal] = new CometColumnarRow(child); + } + reusableNestedRows[ordinal].update(rowId); + return reusableNestedRows[ordinal]; + } + return child.getStruct(rowId); } @Override public ArrayData getArray(int ordinal) { - return data.getChild(ordinal).getArray(rowId); + ColumnVector child = data.getChild(ordinal); + if (child instanceof CometListVector) { + CometListVector listVector = (CometListVector) child; + long offsetAddr = listVector.getOffsetBufferAddress(); + int start = Platform.getInt(null, offsetAddr + (long) rowId * 4L); + int end = Platform.getInt(null, offsetAddr + (long) (rowId + 1) * 4L); + int len = end - start; + + if (reusableNestedArrays == null) { + reusableNestedArrays = new CometColumnarArray[numFields]; + } + if (reusableNestedArrays[ordinal] == null) { + reusableNestedArrays[ordinal] = new CometColumnarArray(listVector.getDataColumnVector()); + } + reusableNestedArrays[ordinal].update(start, len); + return reusableNestedArrays[ordinal]; + } + return child.getArray(rowId); } @Override public MapData getMap(int ordinal) { - return data.getChild(ordinal).getMap(rowId); + ColumnVector child = data.getChild(ordinal); + if (child instanceof CometMapVector) { + CometMapVector mapVector = (CometMapVector) child; + long offsetAddr = mapVector.getOffsetBufferAddress(); + int start = Platform.getInt(null, offsetAddr + (long) rowId * 4L); + int end = Platform.getInt(null, offsetAddr + (long) (rowId + 1) * 4L); + int len = end - start; + + if (reusableNestedMaps == null) { + reusableNestedMaps = new CometColumnarMap[numFields]; + } + if (reusableNestedMaps[ordinal] == null) { + reusableNestedMaps[ordinal] = + new CometColumnarMap(mapVector.getKeysVector(), mapVector.getValuesVector()); + } + reusableNestedMaps[ordinal].update(start, len); + return reusableNestedMaps[ordinal]; + } + return child.getMap(rowId); } @Override From 5a872dadb57656c38cc4849f495ae5618d6fb8c8 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sun, 18 Jan 2026 09:19:51 -0700 Subject: [PATCH 9/9] Revert "perf: add wrapper pooling for nested complex types" This reverts commit 564bf614299cd20f6f4332f08a672d5ef8e1170d. --- .../comet/vector/CometColumnarArray.java | 66 ++----------------- .../apache/comet/vector/CometColumnarRow.java | 61 +---------------- .../comet/vector/CometColumnarArray.java | 66 ++----------------- .../apache/comet/vector/CometColumnarRow.java | 61 +---------------- 4 files changed, 18 insertions(+), 236 deletions(-) diff --git a/common/src/main/spark-3.x/org/apache/comet/vector/CometColumnarArray.java b/common/src/main/spark-3.x/org/apache/comet/vector/CometColumnarArray.java index 9879a8dca7..c370c82273 100644 --- a/common/src/main/spark-3.x/org/apache/comet/vector/CometColumnarArray.java +++ b/common/src/main/spark-3.x/org/apache/comet/vector/CometColumnarArray.java @@ -19,14 +19,11 @@ package org.apache.comet.vector; -import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.catalyst.util.ArrayData; import org.apache.spark.sql.catalyst.util.GenericArrayData; -import org.apache.spark.sql.catalyst.util.MapData; import org.apache.spark.sql.types.DataType; import org.apache.spark.sql.types.Decimal; import org.apache.spark.sql.vectorized.ColumnVector; -import org.apache.spark.unsafe.Platform; import org.apache.spark.unsafe.types.CalendarInterval; import org.apache.spark.unsafe.types.UTF8String; @@ -34,20 +31,12 @@ * A mutable implementation of ArrayData backed by a ColumnVector. Unlike Spark's ColumnarArray * which has final fields, this class allows updating the offset and length to enable object reuse * across rows, reducing GC pressure. - * - *

This class also implements wrapper pooling for nested complex types (arrays, maps, structs) to - * avoid per-element allocations when accessing nested data. */ public class CometColumnarArray extends ArrayData { private ColumnVector data; private int offset; private int length; - // Reusable wrappers for nested complex types to avoid per-element allocations - private CometColumnarArray reusableNestedArray; - private CometColumnarMap reusableNestedMap; - private CometColumnarRow reusableNestedRow; - public CometColumnarArray(ColumnVector data) { this.data = data; this.offset = 0; @@ -66,14 +55,8 @@ public void update(int offset, int length) { this.length = length; } - /** Updates both the data vector and the slice. Resets nested wrappers when data changes. */ + /** Updates both the data vector and the slice. */ public void update(ColumnVector data, int offset, int length) { - if (this.data != data) { - // Reset reusable wrappers when underlying data changes - this.reusableNestedArray = null; - this.reusableNestedMap = null; - this.reusableNestedRow = null; - } this.data = data; this.offset = offset; this.length = length; @@ -179,54 +162,17 @@ public CalendarInterval getInterval(int ordinal) { @Override public ArrayData getArray(int ordinal) { - int idx = offset + ordinal; - if (data instanceof CometListVector) { - CometListVector listVector = (CometListVector) data; - long offsetAddr = listVector.getOffsetBufferAddress(); - int start = Platform.getInt(null, offsetAddr + (long) idx * 4L); - int end = Platform.getInt(null, offsetAddr + (long) (idx + 1) * 4L); - int len = end - start; - - if (reusableNestedArray == null) { - reusableNestedArray = new CometColumnarArray(listVector.getDataColumnVector()); - } - reusableNestedArray.update(start, len); - return reusableNestedArray; - } - return data.getArray(idx); + return data.getArray(offset + ordinal); } @Override - public MapData getMap(int ordinal) { - int idx = offset + ordinal; - if (data instanceof CometMapVector) { - CometMapVector mapVector = (CometMapVector) data; - long offsetAddr = mapVector.getOffsetBufferAddress(); - int start = Platform.getInt(null, offsetAddr + (long) idx * 4L); - int end = Platform.getInt(null, offsetAddr + (long) (idx + 1) * 4L); - int len = end - start; - - if (reusableNestedMap == null) { - reusableNestedMap = - new CometColumnarMap(mapVector.getKeysVector(), mapVector.getValuesVector()); - } - reusableNestedMap.update(start, len); - return reusableNestedMap; - } - return data.getMap(idx); + public org.apache.spark.sql.catalyst.util.MapData getMap(int ordinal) { + return data.getMap(offset + ordinal); } @Override - public InternalRow getStruct(int ordinal, int numFields) { - int idx = offset + ordinal; - if (data instanceof CometStructVector) { - if (reusableNestedRow == null) { - reusableNestedRow = new CometColumnarRow(data); - } - reusableNestedRow.update(idx); - return reusableNestedRow; - } - return data.getStruct(idx); + public org.apache.spark.sql.catalyst.InternalRow getStruct(int ordinal, int numFields) { + return data.getStruct(offset + ordinal); } @Override diff --git a/common/src/main/spark-3.x/org/apache/comet/vector/CometColumnarRow.java b/common/src/main/spark-3.x/org/apache/comet/vector/CometColumnarRow.java index 608d624a77..9e85a3f6b6 100644 --- a/common/src/main/spark-3.x/org/apache/comet/vector/CometColumnarRow.java +++ b/common/src/main/spark-3.x/org/apache/comet/vector/CometColumnarRow.java @@ -27,7 +27,6 @@ import org.apache.spark.sql.types.Decimal; import org.apache.spark.sql.types.StructType; import org.apache.spark.sql.vectorized.ColumnVector; -import org.apache.spark.unsafe.Platform; import org.apache.spark.unsafe.types.CalendarInterval; import org.apache.spark.unsafe.types.UTF8String; @@ -35,20 +34,12 @@ * A mutable implementation of InternalRow backed by a ColumnVector for struct types. Unlike Spark's * ColumnarRow which has final fields, this class allows updating the rowId to enable object reuse * across rows, reducing GC pressure. - * - *

This class also implements wrapper pooling for nested complex types (arrays, maps, structs) to - * avoid per-field allocations when accessing nested data. */ public class CometColumnarRow extends InternalRow { private ColumnVector data; private int rowId; private int numFields; - // Reusable wrappers for nested complex types, indexed by field ordinal - private CometColumnarArray[] reusableNestedArrays; - private CometColumnarMap[] reusableNestedMaps; - private CometColumnarRow[] reusableNestedRows; - public CometColumnarRow(ColumnVector data) { this.data = data; this.rowId = 0; @@ -162,63 +153,17 @@ public CalendarInterval getInterval(int ordinal) { @Override public InternalRow getStruct(int ordinal, int numFields) { - ColumnVector child = data.getChild(ordinal); - if (child instanceof CometStructVector) { - if (reusableNestedRows == null) { - reusableNestedRows = new CometColumnarRow[this.numFields]; - } - if (reusableNestedRows[ordinal] == null) { - reusableNestedRows[ordinal] = new CometColumnarRow(child); - } - reusableNestedRows[ordinal].update(rowId); - return reusableNestedRows[ordinal]; - } - return child.getStruct(rowId); + return data.getChild(ordinal).getStruct(rowId); } @Override public ArrayData getArray(int ordinal) { - ColumnVector child = data.getChild(ordinal); - if (child instanceof CometListVector) { - CometListVector listVector = (CometListVector) child; - long offsetAddr = listVector.getOffsetBufferAddress(); - int start = Platform.getInt(null, offsetAddr + (long) rowId * 4L); - int end = Platform.getInt(null, offsetAddr + (long) (rowId + 1) * 4L); - int len = end - start; - - if (reusableNestedArrays == null) { - reusableNestedArrays = new CometColumnarArray[numFields]; - } - if (reusableNestedArrays[ordinal] == null) { - reusableNestedArrays[ordinal] = new CometColumnarArray(listVector.getDataColumnVector()); - } - reusableNestedArrays[ordinal].update(start, len); - return reusableNestedArrays[ordinal]; - } - return child.getArray(rowId); + return data.getChild(ordinal).getArray(rowId); } @Override public MapData getMap(int ordinal) { - ColumnVector child = data.getChild(ordinal); - if (child instanceof CometMapVector) { - CometMapVector mapVector = (CometMapVector) child; - long offsetAddr = mapVector.getOffsetBufferAddress(); - int start = Platform.getInt(null, offsetAddr + (long) rowId * 4L); - int end = Platform.getInt(null, offsetAddr + (long) (rowId + 1) * 4L); - int len = end - start; - - if (reusableNestedMaps == null) { - reusableNestedMaps = new CometColumnarMap[numFields]; - } - if (reusableNestedMaps[ordinal] == null) { - reusableNestedMaps[ordinal] = - new CometColumnarMap(mapVector.getKeysVector(), mapVector.getValuesVector()); - } - reusableNestedMaps[ordinal].update(start, len); - return reusableNestedMaps[ordinal]; - } - return child.getMap(rowId); + return data.getChild(ordinal).getMap(rowId); } @Override diff --git a/common/src/main/spark-4.0/org/apache/comet/vector/CometColumnarArray.java b/common/src/main/spark-4.0/org/apache/comet/vector/CometColumnarArray.java index f7d8fffbab..15980a55c8 100644 --- a/common/src/main/spark-4.0/org/apache/comet/vector/CometColumnarArray.java +++ b/common/src/main/spark-4.0/org/apache/comet/vector/CometColumnarArray.java @@ -19,14 +19,11 @@ package org.apache.comet.vector; -import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.catalyst.util.ArrayData; import org.apache.spark.sql.catalyst.util.GenericArrayData; -import org.apache.spark.sql.catalyst.util.MapData; import org.apache.spark.sql.types.DataType; import org.apache.spark.sql.types.Decimal; import org.apache.spark.sql.vectorized.ColumnVector; -import org.apache.spark.unsafe.Platform; import org.apache.spark.unsafe.types.CalendarInterval; import org.apache.spark.unsafe.types.UTF8String; import org.apache.spark.unsafe.types.VariantVal; @@ -35,20 +32,12 @@ * A mutable implementation of ArrayData backed by a ColumnVector. Unlike Spark's ColumnarArray * which has final fields, this class allows updating the offset and length to enable object reuse * across rows, reducing GC pressure. - * - *

This class also implements wrapper pooling for nested complex types (arrays, maps, structs) to - * avoid per-element allocations when accessing nested data. */ public class CometColumnarArray extends ArrayData { private ColumnVector data; private int offset; private int length; - // Reusable wrappers for nested complex types to avoid per-element allocations - private CometColumnarArray reusableNestedArray; - private CometColumnarMap reusableNestedMap; - private CometColumnarRow reusableNestedRow; - public CometColumnarArray(ColumnVector data) { this.data = data; this.offset = 0; @@ -67,14 +56,8 @@ public void update(int offset, int length) { this.length = length; } - /** Updates both the data vector and the slice. Resets nested wrappers when data changes. */ + /** Updates both the data vector and the slice. */ public void update(ColumnVector data, int offset, int length) { - if (this.data != data) { - // Reset reusable wrappers when underlying data changes - this.reusableNestedArray = null; - this.reusableNestedMap = null; - this.reusableNestedRow = null; - } this.data = data; this.offset = offset; this.length = length; @@ -180,54 +163,17 @@ public CalendarInterval getInterval(int ordinal) { @Override public ArrayData getArray(int ordinal) { - int idx = offset + ordinal; - if (data instanceof CometListVector) { - CometListVector listVector = (CometListVector) data; - long offsetAddr = listVector.getOffsetBufferAddress(); - int start = Platform.getInt(null, offsetAddr + (long) idx * 4L); - int end = Platform.getInt(null, offsetAddr + (long) (idx + 1) * 4L); - int len = end - start; - - if (reusableNestedArray == null) { - reusableNestedArray = new CometColumnarArray(listVector.getDataColumnVector()); - } - reusableNestedArray.update(start, len); - return reusableNestedArray; - } - return data.getArray(idx); + return data.getArray(offset + ordinal); } @Override - public MapData getMap(int ordinal) { - int idx = offset + ordinal; - if (data instanceof CometMapVector) { - CometMapVector mapVector = (CometMapVector) data; - long offsetAddr = mapVector.getOffsetBufferAddress(); - int start = Platform.getInt(null, offsetAddr + (long) idx * 4L); - int end = Platform.getInt(null, offsetAddr + (long) (idx + 1) * 4L); - int len = end - start; - - if (reusableNestedMap == null) { - reusableNestedMap = - new CometColumnarMap(mapVector.getKeysVector(), mapVector.getValuesVector()); - } - reusableNestedMap.update(start, len); - return reusableNestedMap; - } - return data.getMap(idx); + public org.apache.spark.sql.catalyst.util.MapData getMap(int ordinal) { + return data.getMap(offset + ordinal); } @Override - public InternalRow getStruct(int ordinal, int numFields) { - int idx = offset + ordinal; - if (data instanceof CometStructVector) { - if (reusableNestedRow == null) { - reusableNestedRow = new CometColumnarRow(data); - } - reusableNestedRow.update(idx); - return reusableNestedRow; - } - return data.getStruct(idx); + public org.apache.spark.sql.catalyst.InternalRow getStruct(int ordinal, int numFields) { + return data.getStruct(offset + ordinal); } @Override diff --git a/common/src/main/spark-4.0/org/apache/comet/vector/CometColumnarRow.java b/common/src/main/spark-4.0/org/apache/comet/vector/CometColumnarRow.java index e6358381ea..fbae928887 100644 --- a/common/src/main/spark-4.0/org/apache/comet/vector/CometColumnarRow.java +++ b/common/src/main/spark-4.0/org/apache/comet/vector/CometColumnarRow.java @@ -27,7 +27,6 @@ import org.apache.spark.sql.types.Decimal; import org.apache.spark.sql.types.StructType; import org.apache.spark.sql.vectorized.ColumnVector; -import org.apache.spark.unsafe.Platform; import org.apache.spark.unsafe.types.CalendarInterval; import org.apache.spark.unsafe.types.UTF8String; import org.apache.spark.unsafe.types.VariantVal; @@ -36,20 +35,12 @@ * A mutable implementation of InternalRow backed by a ColumnVector for struct types. Unlike Spark's * ColumnarRow which has final fields, this class allows updating the rowId to enable object reuse * across rows, reducing GC pressure. - * - *

This class also implements wrapper pooling for nested complex types (arrays, maps, structs) to - * avoid per-field allocations when accessing nested data. */ public class CometColumnarRow extends InternalRow { private ColumnVector data; private int rowId; private int numFields; - // Reusable wrappers for nested complex types, indexed by field ordinal - private CometColumnarArray[] reusableNestedArrays; - private CometColumnarMap[] reusableNestedMaps; - private CometColumnarRow[] reusableNestedRows; - public CometColumnarRow(ColumnVector data) { this.data = data; this.rowId = 0; @@ -163,63 +154,17 @@ public CalendarInterval getInterval(int ordinal) { @Override public InternalRow getStruct(int ordinal, int numFields) { - ColumnVector child = data.getChild(ordinal); - if (child instanceof CometStructVector) { - if (reusableNestedRows == null) { - reusableNestedRows = new CometColumnarRow[this.numFields]; - } - if (reusableNestedRows[ordinal] == null) { - reusableNestedRows[ordinal] = new CometColumnarRow(child); - } - reusableNestedRows[ordinal].update(rowId); - return reusableNestedRows[ordinal]; - } - return child.getStruct(rowId); + return data.getChild(ordinal).getStruct(rowId); } @Override public ArrayData getArray(int ordinal) { - ColumnVector child = data.getChild(ordinal); - if (child instanceof CometListVector) { - CometListVector listVector = (CometListVector) child; - long offsetAddr = listVector.getOffsetBufferAddress(); - int start = Platform.getInt(null, offsetAddr + (long) rowId * 4L); - int end = Platform.getInt(null, offsetAddr + (long) (rowId + 1) * 4L); - int len = end - start; - - if (reusableNestedArrays == null) { - reusableNestedArrays = new CometColumnarArray[numFields]; - } - if (reusableNestedArrays[ordinal] == null) { - reusableNestedArrays[ordinal] = new CometColumnarArray(listVector.getDataColumnVector()); - } - reusableNestedArrays[ordinal].update(start, len); - return reusableNestedArrays[ordinal]; - } - return child.getArray(rowId); + return data.getChild(ordinal).getArray(rowId); } @Override public MapData getMap(int ordinal) { - ColumnVector child = data.getChild(ordinal); - if (child instanceof CometMapVector) { - CometMapVector mapVector = (CometMapVector) child; - long offsetAddr = mapVector.getOffsetBufferAddress(); - int start = Platform.getInt(null, offsetAddr + (long) rowId * 4L); - int end = Platform.getInt(null, offsetAddr + (long) (rowId + 1) * 4L); - int len = end - start; - - if (reusableNestedMaps == null) { - reusableNestedMaps = new CometColumnarMap[numFields]; - } - if (reusableNestedMaps[ordinal] == null) { - reusableNestedMaps[ordinal] = - new CometColumnarMap(mapVector.getKeysVector(), mapVector.getValuesVector()); - } - reusableNestedMaps[ordinal].update(start, len); - return reusableNestedMaps[ordinal]; - } - return child.getMap(rowId); + return data.getChild(ordinal).getMap(rowId); } @Override