Skip to content
2 changes: 1 addition & 1 deletion .github/workflows/jarbuild.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
# under the License.

name: JarBuild
on:
on:
workflow_dispatch:
inputs:
arrow_branch:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -305,15 +305,29 @@ public void exportCDataBuffers(List<ArrowBuf> buffers, ArrowBuf buffersPtr, long

/** Set the reader and writer indexes for the inner buffers. */
private void setReaderAndWriterIndex() {
final long requiredOffsetBufferCapacity = (long) (valueCount + 1) * OFFSET_WIDTH;
validityBuffer.readerIndex(0);
offsetBuffer.readerIndex(0);
if (valueCount == 0) {
validityBuffer.writerIndex(0);
offsetBuffer.writerIndex(0);
ensureEmptyOffsetBufferCapacity(requiredOffsetBufferCapacity);
} else {
validityBuffer.writerIndex(getValidityBufferSizeFromCount(valueCount));
offsetBuffer.writerIndex((valueCount + 1) * OFFSET_WIDTH);
}
// IPC serializers use readerIndex and writerIndex to determine readable bytes. Even when the
// list is empty, the Arrow layout requires the offset buffer to contain offset[0].
offsetBuffer.writerIndex(requiredOffsetBufferCapacity);
}

private void ensureEmptyOffsetBufferCapacity(long requiredCapacity) {
if (offsetBuffer.capacity() >= requiredCapacity) {
return;
}
long previousOffsetAllocationSizeInBytes = offsetAllocationSizeInBytes;
ArrowBuf oldOffsetBuffer = offsetBuffer;
offsetBuffer = allocateOffsetBuffer(requiredCapacity);
offsetAllocationSizeInBytes = previousOffsetAllocationSizeInBytes;
oldOffsetBuffer.getReferenceManager().release();
}

/**
Expand Down Expand Up @@ -672,24 +686,30 @@ public void splitAndTransfer(int startIndex, int length) {
startIndex,
length,
valueCount);
final long startPoint = offsetBuffer.getLong((long) startIndex * OFFSET_WIDTH);
final long sliceLength =
offsetBuffer.getLong((long) (startIndex + length) * OFFSET_WIDTH) - startPoint;
to.clear();
to.offsetBuffer = to.allocateOffsetBuffer((length + 1) * OFFSET_WIDTH);
/* splitAndTransfer offset buffer */
for (int i = 0; i < length + 1; i++) {
final long relativeOffset =
offsetBuffer.getLong((long) (startIndex + i) * OFFSET_WIDTH) - startPoint;
to.offsetBuffer.setLong((long) i * OFFSET_WIDTH, relativeOffset);
if (length > 0) {
final long startPoint = offsetBuffer.getLong((long) startIndex * OFFSET_WIDTH);
final long sliceLength =
offsetBuffer.getLong((long) (startIndex + length) * OFFSET_WIDTH) - startPoint;
to.offsetBuffer = to.allocateOffsetBuffer((length + 1) * OFFSET_WIDTH);
/* splitAndTransfer offset buffer */
for (int i = 0; i < length + 1; i++) {
final long relativeOffset =
offsetBuffer.getLong((long) (startIndex + i) * OFFSET_WIDTH) - startPoint;
to.offsetBuffer.setLong((long) i * OFFSET_WIDTH, relativeOffset);
}
/* splitAndTransfer validity buffer */
splitAndTransferValidityBuffer(startIndex, length, to);
/* splitAndTransfer data buffer */
dataTransferPair.splitAndTransfer(
checkedCastToInt(startPoint), checkedCastToInt(sliceLength));
to.lastSet = length - 1;
to.setValueCount(length);
} else {
to.ensureEmptyOffsetBufferCapacity(OFFSET_WIDTH);
dataTransferPair.splitAndTransfer(0, 0);
to.setValueCount(0);
}
/* splitAndTransfer validity buffer */
splitAndTransferValidityBuffer(startIndex, length, to);
/* splitAndTransfer data buffer */
dataTransferPair.splitAndTransfer(
checkedCastToInt(startPoint), checkedCastToInt(sliceLength));
to.lastSet = length - 1;
to.setValueCount(length);
}

/*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,15 +263,29 @@ public void exportCDataBuffers(List<ArrowBuf> buffers, ArrowBuf buffersPtr, long

/** Set the reader and writer indexes for the inner buffers. */
private void setReaderAndWriterIndex() {
final long requiredOffsetBufferCapacity = (long) (valueCount + 1) * OFFSET_WIDTH;
validityBuffer.readerIndex(0);
offsetBuffer.readerIndex(0);
if (valueCount == 0) {
validityBuffer.writerIndex(0);
offsetBuffer.writerIndex(0);
ensureEmptyOffsetBufferCapacity(requiredOffsetBufferCapacity);
} else {
validityBuffer.writerIndex(getValidityBufferSizeFromCount(valueCount));
offsetBuffer.writerIndex((valueCount + 1) * OFFSET_WIDTH);
}
// IPC serializers use readerIndex and writerIndex to determine readable bytes. Even when the
// list is empty, the Arrow layout requires the offset buffer to contain offset[0].
offsetBuffer.writerIndex(requiredOffsetBufferCapacity);
}

private void ensureEmptyOffsetBufferCapacity(long requiredCapacity) {
if (offsetBuffer.capacity() >= requiredCapacity) {
return;
}
long previousOffsetAllocationSizeInBytes = offsetAllocationSizeInBytes;
ArrowBuf oldOffsetBuffer = offsetBuffer;
offsetBuffer = allocateOffsetBuffer(requiredCapacity);
offsetAllocationSizeInBytes = previousOffsetAllocationSizeInBytes;
oldOffsetBuffer.getReferenceManager().release();
}

/**
Expand Down Expand Up @@ -570,6 +584,10 @@ public void splitAndTransfer(int startIndex, int length) {
dataTransferPair.splitAndTransfer(startPoint, sliceLength);
to.lastSet = length - 1;
to.setValueCount(length);
} else {
to.ensureEmptyOffsetBufferCapacity(OFFSET_WIDTH);
dataTransferPair.splitAndTransfer(0, 0);
to.setValueCount(0);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -955,6 +955,52 @@ public void testGetBufferSizeFor() {
}
}

@Test
public void testEmptyLargeListOffsetBuffer() {
try (LargeListVector list = LargeListVector.empty("list", allocator)) {
list.addOrGetVector(FieldType.nullable(MinorType.INT.getType()));
list.allocateNew();
list.setValueCount(0);

assertEmptyLargeListOffsetBuffer(list);
}
}

@Test
public void testUnallocatedEmptyLargeListOffsetBuffer() {
try (LargeListVector list = LargeListVector.empty("list", allocator)) {
list.addOrGetVector(FieldType.nullable(MinorType.INT.getType()));
list.setValueCount(0);

assertEmptyLargeListOffsetBuffer(list);
}
}

@Test
public void testSplitAndTransferEmptyLargeListOffsetBuffer() {
try (LargeListVector source = LargeListVector.empty("source", allocator);
LargeListVector target = LargeListVector.empty("target", allocator)) {
source.addOrGetVector(FieldType.nullable(MinorType.INT.getType()));
target.addOrGetVector(FieldType.nullable(MinorType.INT.getType()));
source.allocateNew();
source.setValueCount(0);

TransferPair transferPair = source.makeTransferPair(target);
transferPair.splitAndTransfer(0, 0);

assertEmptyLargeListOffsetBuffer(target);
}
}

private ArrowBuf assertEmptyLargeListOffsetBuffer(LargeListVector list) {
List<ArrowBuf> buffers = list.getFieldBuffers();
ArrowBuf offsetBuffer = buffers.get(1);
assertEquals(LargeListVector.OFFSET_WIDTH, offsetBuffer.readableBytes());
assertTrue(offsetBuffer.capacity() >= LargeListVector.OFFSET_WIDTH);
assertEquals(0L, offsetBuffer.getLong(0));
return offsetBuffer;
}

@Test
public void testIsEmpty() {
try (final LargeListVector vector = LargeListVector.empty("list", allocator)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1135,6 +1135,52 @@ public void testGetBufferSizeFor() {
}
}

@Test
public void testEmptyListOffsetBuffer() {
try (ListVector list = ListVector.empty("list", allocator)) {
list.addOrGetVector(FieldType.nullable(MinorType.INT.getType()));
list.allocateNew();
list.setValueCount(0);

assertEmptyListOffsetBuffer(list);
}
}

@Test
public void testUnallocatedEmptyListOffsetBuffer() {
try (ListVector list = ListVector.empty("list", allocator)) {
list.addOrGetVector(FieldType.nullable(MinorType.INT.getType()));
list.setValueCount(0);

assertEmptyListOffsetBuffer(list);
}
}

@Test
public void testSplitAndTransferEmptyListOffsetBuffer() {
try (ListVector source = ListVector.empty("source", allocator);
ListVector target = ListVector.empty("target", allocator)) {
source.addOrGetVector(FieldType.nullable(MinorType.INT.getType()));
target.addOrGetVector(FieldType.nullable(MinorType.INT.getType()));
source.allocateNew();
source.setValueCount(0);

TransferPair transferPair = source.makeTransferPair(target);
transferPair.splitAndTransfer(0, 0);

assertEmptyListOffsetBuffer(target);
}
}

private ArrowBuf assertEmptyListOffsetBuffer(ListVector list) {
List<ArrowBuf> buffers = list.getFieldBuffers();
ArrowBuf offsetBuffer = buffers.get(1);
assertEquals(BaseRepeatedValueVector.OFFSET_WIDTH, offsetBuffer.readableBytes());
assertTrue(offsetBuffer.capacity() >= BaseRepeatedValueVector.OFFSET_WIDTH);
assertEquals(0, offsetBuffer.getInt(0));
return offsetBuffer;
}

@Test
public void testIsEmpty() {
try (final ListVector vector = ListVector.empty("list", allocator)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,13 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.arrow.memory.ArrowBuf;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.complex.BaseRepeatedValueVector;
import org.apache.arrow.vector.complex.DenseUnionVector;
import org.apache.arrow.vector.complex.FixedSizeListVector;
import org.apache.arrow.vector.complex.LargeListVector;
import org.apache.arrow.vector.complex.LargeListViewVector;
import org.apache.arrow.vector.complex.ListVector;
import org.apache.arrow.vector.complex.MapVector;
Expand Down Expand Up @@ -115,6 +118,8 @@ public void testWithEmptyVector() {
TransferPair transferPair = listVector.getTransferPair(allocator);
transferPair.splitAndTransfer(0, 0);
assertEquals(0, transferPair.getTo().getValueCount());
transferPair.getTo().clear();
listVector.clear();
// BaseFixedWidthVector
IntVector intVector = new IntVector("", allocator);
transferPair = intVector.getTransferPair(allocator);
Expand Down Expand Up @@ -911,6 +916,11 @@ public void testListVectorZeroStartIndexAndLength() {

tp.splitAndTransfer(0, 0);
assertEquals(valueCount, newListVector.getValueCount());
List<ArrowBuf> buffers = newListVector.getFieldBuffers();
ArrowBuf offsetBuffer = buffers.get(1);
assertEquals(BaseRepeatedValueVector.OFFSET_WIDTH, offsetBuffer.readableBytes());
assertTrue(offsetBuffer.capacity() >= BaseRepeatedValueVector.OFFSET_WIDTH);
assertEquals(0, offsetBuffer.getInt(0));

newListVector.clear();
}
Expand All @@ -935,6 +945,29 @@ public void testLargeListViewVectorZeroStartIndexAndLength() {
}
}

@Test
public void testLargeListVectorZeroStartIndexAndLength() {
try (final LargeListVector listVector = LargeListVector.empty("largelist", allocator);
final LargeListVector newListVector = LargeListVector.empty("newList", allocator)) {

listVector.allocateNew();
final int valueCount = 0;
listVector.setValueCount(valueCount);

final TransferPair tp = listVector.makeTransferPair(newListVector);

tp.splitAndTransfer(0, 0);
assertEquals(valueCount, newListVector.getValueCount());
List<ArrowBuf> buffers = newListVector.getFieldBuffers();
ArrowBuf offsetBuffer = buffers.get(1);
assertEquals(LargeListVector.OFFSET_WIDTH, offsetBuffer.readableBytes());
assertTrue(offsetBuffer.capacity() >= LargeListVector.OFFSET_WIDTH);
assertEquals(0L, offsetBuffer.getLong(0));

newListVector.clear();
}
}

@Test
public void testStructVectorZeroStartIndexAndLength() {
Map<String, String> metadata = new HashMap<>();
Expand Down
Loading