Skip to content
2 changes: 1 addition & 1 deletion benchmarks/pyspark/run_all_benchmarks.sh
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ set -e

SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
DATA_PATH="${1:-/tmp/shuffle-benchmark-data}"
COMET_JAR="${COMET_JAR:-$SCRIPT_DIR/../spark/target/comet-spark-spark3.5_2.12-0.13.0-SNAPSHOT.jar}"
COMET_JAR="${COMET_JAR:-$SCRIPT_DIR/../../spark/target/comet-spark-spark3.5_2.12-0.13.0-SNAPSHOT.jar}"
SPARK_MASTER="${SPARK_MASTER:-local[*]}"
EXECUTOR_MEMORY="${EXECUTOR_MEMORY:-16g}"
EVENT_LOG_DIR="${EVENT_LOG_DIR:-/tmp/spark-events}"
Expand Down
75 changes: 75 additions & 0 deletions common/src/main/java/org/apache/comet/vector/CometColumnarMap.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.comet.vector;

import org.apache.spark.sql.catalyst.util.ArrayBasedMapData;
import org.apache.spark.sql.catalyst.util.ArrayData;
import org.apache.spark.sql.catalyst.util.MapData;
import org.apache.spark.sql.vectorized.ColumnVector;

/**
* A mutable implementation of MapData backed by ColumnVectors. Unlike Spark's ColumnarMap which has
* final fields, this class allows updating the offset and length to enable object reuse across
* rows, reducing GC pressure.
*/
public class CometColumnarMap extends MapData {
private final CometColumnarArray keys;
private final CometColumnarArray values;
private int length;

public CometColumnarMap(ColumnVector keysData, ColumnVector valuesData) {
this.keys = new CometColumnarArray(keysData);
this.values = new CometColumnarArray(valuesData);
this.length = 0;
}

public CometColumnarMap(ColumnVector keysData, ColumnVector valuesData, int offset, int length) {
this.keys = new CometColumnarArray(keysData, offset, length);
this.values = new CometColumnarArray(valuesData, offset, length);
this.length = length;
}

/** Updates this map to point to a new slice of the underlying data. */
public void update(int offset, int length) {
this.keys.update(offset, length);
this.values.update(offset, length);
this.length = length;
}

@Override
public int numElements() {
return length;
}

@Override
public ArrayData keyArray() {
return keys;
}

@Override
public ArrayData valueArray() {
return values;
}

@Override
public MapData copy() {
return new ArrayBasedMapData(keys.copy(), values.copy());
}
}
17 changes: 15 additions & 2 deletions common/src/main/java/org/apache/comet/vector/CometListVector.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,15 @@
import org.apache.arrow.vector.util.TransferPair;
import org.apache.spark.sql.vectorized.ColumnVector;
import org.apache.spark.sql.vectorized.ColumnarArray;
import org.apache.spark.unsafe.Platform;

/** A Comet column vector for list type. */
public class CometListVector extends CometDecodedVector {
final ListVector listVector;
final ValueVector dataVector;
final ColumnVector dataColumnVector;
final DictionaryProvider dictionaryProvider;
final long offsetBufferAddress;

public CometListVector(
ValueVector vector, boolean useDecimal128, DictionaryProvider dictionaryProvider) {
Expand All @@ -41,13 +43,24 @@ public CometListVector(
this.dataVector = listVector.getDataVector();
this.dictionaryProvider = dictionaryProvider;
this.dataColumnVector = getVector(dataVector, useDecimal128, dictionaryProvider);
this.offsetBufferAddress = listVector.getOffsetBuffer().memoryAddress();
}

/** Returns the cached offset buffer memory address for direct access. */
public long getOffsetBufferAddress() {
return offsetBufferAddress;
}

/** Returns the wrapped data column vector for the array elements. */
public ColumnVector getDataColumnVector() {
return dataColumnVector;
}

@Override
public ColumnarArray getArray(int i) {
if (isNullAt(i)) return null;
int start = listVector.getOffsetBuffer().getInt(i * ListVector.OFFSET_WIDTH);
int end = listVector.getOffsetBuffer().getInt((i + 1) * ListVector.OFFSET_WIDTH);
int start = Platform.getInt(null, offsetBufferAddress + (long) i * ListVector.OFFSET_WIDTH);
int end = Platform.getInt(null, offsetBufferAddress + (long) (i + 1) * ListVector.OFFSET_WIDTH);

return new ColumnarArray(dataColumnVector, start, end - start);
}
Expand Down
22 changes: 20 additions & 2 deletions common/src/main/java/org/apache/comet/vector/CometMapVector.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,15 @@
import org.apache.arrow.vector.util.TransferPair;
import org.apache.spark.sql.vectorized.ColumnVector;
import org.apache.spark.sql.vectorized.ColumnarMap;
import org.apache.spark.unsafe.Platform;

/** A Comet column vector for map type. */
public class CometMapVector extends CometDecodedVector {
final MapVector mapVector;
final ValueVector dataVector;
final CometStructVector dataColumnVector;
final DictionaryProvider dictionaryProvider;
final long offsetBufferAddress;

final ColumnVector keys;
final ColumnVector values;
Expand All @@ -44,6 +46,7 @@ public CometMapVector(
this.mapVector = ((MapVector) vector);
this.dataVector = mapVector.getDataVector();
this.dictionaryProvider = dictionaryProvider;
this.offsetBufferAddress = mapVector.getOffsetBuffer().memoryAddress();

if (dataVector instanceof StructVector) {
this.dataColumnVector = new CometStructVector(dataVector, useDecimal128, dictionaryProvider);
Expand All @@ -63,11 +66,26 @@ public CometMapVector(
}
}

/** Returns the cached offset buffer memory address for direct access. */
public long getOffsetBufferAddress() {
return offsetBufferAddress;
}

/** Returns the wrapped column vector for map keys. */
public ColumnVector getKeysVector() {
return keys;
}

/** Returns the wrapped column vector for map values. */
public ColumnVector getValuesVector() {
return values;
}

@Override
public ColumnarMap getMap(int i) {
if (isNullAt(i)) return null;
int start = mapVector.getOffsetBuffer().getInt(i * MapVector.OFFSET_WIDTH);
int end = mapVector.getOffsetBuffer().getInt((i + 1) * MapVector.OFFSET_WIDTH);
int start = Platform.getInt(null, offsetBufferAddress + (long) i * MapVector.OFFSET_WIDTH);
int end = Platform.getInt(null, offsetBufferAddress + (long) (i + 1) * MapVector.OFFSET_WIDTH);

return new ColumnarMap(keys, values, start, end - start);
}
Expand Down
13 changes: 13 additions & 0 deletions common/src/main/java/org/apache/comet/vector/CometPlainVector.java
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,19 @@ public void setReused(boolean isReused) {
this.isReused = isReused;
}

/** Returns the cached value buffer memory address for direct access. */
public long getValueBufferAddress() {
return valueBufferAddress;
}

/** Returns the element size in bytes for fixed-width types, or -1 for variable-width. */
public int getElementSize() {
if (valueVector instanceof BaseFixedWidthVector) {
return ((BaseFixedWidthVector) valueVector).getTypeWidth();
}
return -1;
}

@Override
public void setNumNulls(int numNulls) {
super.setNumNulls(numNulls);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.comet.vector;

import org.apache.spark.sql.catalyst.util.ArrayData;
import org.apache.spark.sql.catalyst.util.GenericArrayData;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.Decimal;
import org.apache.spark.sql.vectorized.ColumnVector;
import org.apache.spark.unsafe.types.CalendarInterval;
import org.apache.spark.unsafe.types.UTF8String;

/**
* A mutable implementation of ArrayData backed by a ColumnVector. Unlike Spark's ColumnarArray
* which has final fields, this class allows updating the offset and length to enable object reuse
* across rows, reducing GC pressure.
*/
public class CometColumnarArray extends ArrayData {
private ColumnVector data;
private int offset;
private int length;

public CometColumnarArray(ColumnVector data) {
this.data = data;
this.offset = 0;
this.length = 0;
}

public CometColumnarArray(ColumnVector data, int offset, int length) {
this.data = data;
this.offset = offset;
this.length = length;
}

/** Updates this array to point to a new slice of the underlying data. */
public void update(int offset, int length) {
this.offset = offset;
this.length = length;
}

/** Updates both the data vector and the slice. */
public void update(ColumnVector data, int offset, int length) {
this.data = data;
this.offset = offset;
this.length = length;
}

@Override
public int numElements() {
return length;
}

@Override
public ArrayData copy() {
Object[] values = new Object[length];
for (int i = 0; i < length; i++) {
if (!isNullAt(i)) {
values[i] = get(i, data.dataType());
}
}
return new GenericArrayData(values);
}

@Override
public Object[] array() {
DataType dt = data.dataType();
Object[] values = new Object[length];
for (int i = 0; i < length; i++) {
if (!isNullAt(i)) {
values[i] = get(i, dt);
}
}
return values;
}

@Override
public void setNullAt(int i) {
throw new UnsupportedOperationException("CometColumnarArray is read-only");
}

@Override
public void update(int i, Object value) {
throw new UnsupportedOperationException("CometColumnarArray is read-only");
}

@Override
public boolean isNullAt(int ordinal) {
return data.isNullAt(offset + ordinal);
}

@Override
public boolean getBoolean(int ordinal) {
return data.getBoolean(offset + ordinal);
}

@Override
public byte getByte(int ordinal) {
return data.getByte(offset + ordinal);
}

@Override
public short getShort(int ordinal) {
return data.getShort(offset + ordinal);
}

@Override
public int getInt(int ordinal) {
return data.getInt(offset + ordinal);
}

@Override
public long getLong(int ordinal) {
return data.getLong(offset + ordinal);
}

@Override
public float getFloat(int ordinal) {
return data.getFloat(offset + ordinal);
}

@Override
public double getDouble(int ordinal) {
return data.getDouble(offset + ordinal);
}

@Override
public Decimal getDecimal(int ordinal, int precision, int scale) {
return data.getDecimal(offset + ordinal, precision, scale);
}

@Override
public UTF8String getUTF8String(int ordinal) {
return data.getUTF8String(offset + ordinal);
}

@Override
public byte[] getBinary(int ordinal) {
return data.getBinary(offset + ordinal);
}

@Override
public CalendarInterval getInterval(int ordinal) {
return data.getInterval(offset + ordinal);
}

@Override
public ArrayData getArray(int ordinal) {
return data.getArray(offset + ordinal);
}

@Override
public org.apache.spark.sql.catalyst.util.MapData getMap(int ordinal) {
return data.getMap(offset + ordinal);
}

@Override
public org.apache.spark.sql.catalyst.InternalRow getStruct(int ordinal, int numFields) {
return data.getStruct(offset + ordinal);
}

@Override
public Object get(int ordinal, DataType dataType) {
if (isNullAt(ordinal)) {
return null;
}
return org.apache.spark.sql.catalyst.expressions.SpecializedGettersReader.read(
this, ordinal, dataType, true, true);
}
}
Loading
Loading