Skip to content

Commit eb75160

Browse files
committed
Track the buffers and close all the resources in tests
1 parent a494af3 commit eb75160

17 files changed

Lines changed: 349 additions & 344 deletions

parquet-common/src/main/java/org/apache/parquet/bytes/TrackingByteBufferAllocator.java

Lines changed: 12 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,8 @@
2020

2121
import java.nio.ByteBuffer;
2222
import java.util.HashMap;
23-
import java.util.HashSet;
2423
import java.util.Map;
2524
import java.util.Objects;
26-
import java.util.Set;
2725

2826
/**
2927
* A wrapper {@link ByteBufferAllocator} implementation that tracks whether all allocated buffers are released. It
@@ -51,11 +49,7 @@ private static class Key {
5149
private final ByteBuffer buffer;
5250

5351
Key(ByteBuffer buffer) {
54-
if (!buffer.isDirect() && buffer.hasArray()) {
55-
hashCode = System.identityHashCode(buffer.array());
56-
} else {
57-
hashCode = System.identityHashCode(buffer);
58-
}
52+
hashCode = System.identityHashCode(buffer);
5953
this.buffer = buffer;
6054
}
6155

@@ -68,9 +62,6 @@ public boolean equals(Object o) {
6862
return false;
6963
}
7064
Key key = (Key) o;
71-
if (!buffer.isDirect() && buffer.hasArray() && !key.buffer.isDirect() && key.buffer.hasArray()) {
72-
return buffer.array() == key.buffer.array();
73-
}
7465
return this.buffer == key.buffer;
7566
}
7667

@@ -133,7 +124,6 @@ private LeakedByteBufferException(int count, ByteBufferAllocationStacktraceExcep
133124
}
134125

135126
private final Map<Key, ByteBufferAllocationStacktraceException> allocated = new HashMap<>();
136-
private final Set<Object> releasedArrays = new HashSet<>();
137127
private final ByteBufferAllocator allocator;
138128

139129
private TrackingByteBufferAllocator(ByteBufferAllocator allocator) {
@@ -150,19 +140,12 @@ public ByteBuffer allocate(int size) {
150140
@Override
151141
public void release(ByteBuffer b) throws ReleasingUnallocatedByteBufferException {
152142
Objects.requireNonNull(b);
153-
if (allocated.remove(new Key(b)) != null) {
154-
allocator.release(b);
155-
if (!b.isDirect() && b.hasArray()) {
156-
releasedArrays.add(b.array());
157-
}
158-
b.clear();
159-
return;
160-
}
161-
if (!b.isDirect() && b.hasArray() && releasedArrays.contains(b.array())) {
162-
b.clear();
163-
return;
143+
if (allocated.remove(new Key(b)) == null) {
144+
throw new ReleasingUnallocatedByteBufferException();
164145
}
165-
throw new ReleasingUnallocatedByteBufferException();
146+
allocator.release(b);
147+
// Clearing the buffer so subsequent access would probably generate errors
148+
b.clear();
166149
}
167150

168151
@Override
@@ -171,12 +154,12 @@ public boolean isDirect() {
171154
}
172155

173156
@Override
174-
public void close() {
175-
// Release all remaining buffers through the underlying allocator
176-
// so they are properly freed (e.g. direct memory cleanup).
177-
for (Key key : allocated.keySet()) {
178-
allocator.release(key.buffer);
157+
public void close() throws LeakedByteBufferException {
158+
if (!allocated.isEmpty()) {
159+
LeakedByteBufferException ex = new LeakedByteBufferException(
160+
allocated.size(), allocated.values().iterator().next());
161+
allocated.clear(); // Drop the references to the ByteBuffers, so they can be gc'd
162+
throw ex;
179163
}
180-
allocated.clear();
181164
}
182165
}

parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@
6565
import org.apache.parquet.HadoopReadOptions;
6666
import org.apache.parquet.ParquetReadOptions;
6767
import org.apache.parquet.Preconditions;
68+
import org.apache.parquet.bytes.ByteBufferAllocator;
6869
import org.apache.parquet.bytes.ByteBufferInputStream;
6970
import org.apache.parquet.bytes.ByteBufferReleaser;
7071
import org.apache.parquet.bytes.BytesInput;
@@ -1361,8 +1362,30 @@ private void readVectored(List<ConsecutivePartList> allParts, ChunkListBuilder b
13611362
totalSize += len;
13621363
}
13631364
LOG.debug("Reading {} bytes of data with vectored IO in {} ranges", totalSize, ranges.size());
1364-
// Request a vectored read;
1365-
f.readVectored(ranges, options.getAllocator());
1365+
// Request a vectored read; track all buffers allocated during the call so that
1366+
// internal buffers (e.g. from ChecksumFileSystem) are also released.
1367+
List<ByteBuffer> allocatedBuffers = new ArrayList<>();
1368+
ByteBufferAllocator allocator = options.getAllocator();
1369+
ByteBufferAllocator trackingAllocator = new ByteBufferAllocator() {
1370+
@Override
1371+
public ByteBuffer allocate(int size) {
1372+
ByteBuffer buf = allocator.allocate(size);
1373+
allocatedBuffers.add(buf);
1374+
return buf;
1375+
}
1376+
1377+
@Override
1378+
public void release(ByteBuffer b) {
1379+
allocator.release(b);
1380+
}
1381+
1382+
@Override
1383+
public boolean isDirect() {
1384+
return allocator.isDirect();
1385+
}
1386+
};
1387+
f.readVectored(ranges, trackingAllocator);
1388+
builder.addBuffersToRelease(allocatedBuffers);
13661389
int k = 0;
13671390
for (ConsecutivePartList consecutivePart : allParts) {
13681391
ParquetFileRange currRange = ranges.get(k++);
@@ -2327,7 +2350,6 @@ public void readFromVectoredRange(ParquetFileRange currRange, ChunkListBuilder b
23272350
LOG.error(error, e);
23282351
throw new IOException(error, e);
23292352
}
2330-
builder.addBuffersToRelease(Collections.singletonList(buffer));
23312353
ByteBufferInputStream stream = ByteBufferInputStream.wrap(buffer);
23322354
for (ChunkDescriptor descriptor : chunks) {
23332355
builder.add(descriptor, stream.sliceBuffers(descriptor.size), f);

parquet-hadoop/src/test/java/org/apache/parquet/crypto/propertiesfactory/SchemaControlEncryptionTest.java

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -167,21 +167,21 @@ private String encryptParquetFile(String file, Configuration conf) throws IOExce
167167
}
168168

169169
private void decryptParquetFileAndValid(String file, Configuration conf) throws IOException {
170-
ParquetReader<Group> reader = ParquetReader.builder(new GroupReadSupport(), new Path(file))
170+
try (ParquetReader<Group> reader = ParquetReader.builder(new GroupReadSupport(), new Path(file))
171171
.withConf(conf)
172-
.build();
173-
for (int i = 0; i < numRecord; i++) {
174-
Group group = reader.read();
175-
assertEquals(testData.get("Name")[i], group.getBinary("Name", 0).toStringUsingUTF8());
176-
assertEquals(testData.get("Age")[i], group.getLong("Age", 0));
177-
178-
Group subGroup = group.getGroup("WebLinks", 0);
179-
assertArrayEquals(
180-
subGroup.getBinary("LinkedIn", 0).getBytes(), ((String) testData.get("LinkedIn")[i]).getBytes());
181-
assertArrayEquals(
182-
subGroup.getBinary("Twitter", 0).getBytes(), ((String) testData.get("Twitter")[i]).getBytes());
172+
.build()) {
173+
for (int i = 0; i < numRecord; i++) {
174+
Group group = reader.read();
175+
assertEquals(testData.get("Name")[i], group.getBinary("Name", 0).toStringUsingUTF8());
176+
assertEquals(testData.get("Age")[i], group.getLong("Age", 0));
177+
178+
Group subGroup = group.getGroup("WebLinks", 0);
179+
assertArrayEquals(
180+
subGroup.getBinary("LinkedIn", 0).getBytes(), ((String) testData.get("LinkedIn")[i]).getBytes());
181+
assertArrayEquals(
182+
subGroup.getBinary("Twitter", 0).getBytes(), ((String) testData.get("Twitter")[i]).getBytes());
183+
}
183184
}
184-
reader.close();
185185
}
186186

187187
private static String createTempFile(String prefix) {

parquet-hadoop/src/test/java/org/apache/parquet/encodings/FileEncodingsIT.java

Lines changed: 34 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -254,7 +254,7 @@ private void writeValuesToFile(
254254
SimpleGroupFactory message = new SimpleGroupFactory(schema);
255255
GroupWriteSupport.setSchema(schema, configuration);
256256

257-
ParquetWriter<Group> writer = ExampleParquetWriter.builder(file)
257+
try (ParquetWriter<Group> writer = ExampleParquetWriter.builder(file)
258258
.withAllocator(allocator)
259259
.withCompressionCodec(compression)
260260
.withRowGroupSize(rowGroupSize)
@@ -263,36 +263,35 @@ private void writeValuesToFile(
263263
.withDictionaryEncoding(enableDictionary)
264264
.withWriterVersion(version)
265265
.withConf(configuration)
266-
.build();
267-
268-
for (Object o : values) {
269-
switch (type) {
270-
case BOOLEAN:
271-
writer.write(message.newGroup().append("field", (Boolean) o));
272-
break;
273-
case INT32:
274-
writer.write(message.newGroup().append("field", (Integer) o));
275-
break;
276-
case INT64:
277-
writer.write(message.newGroup().append("field", (Long) o));
278-
break;
279-
case FLOAT:
280-
writer.write(message.newGroup().append("field", (Float) o));
281-
break;
282-
case DOUBLE:
283-
writer.write(message.newGroup().append("field", (Double) o));
284-
break;
285-
case INT96:
286-
case BINARY:
287-
case FIXED_LEN_BYTE_ARRAY:
288-
writer.write(message.newGroup().append("field", (Binary) o));
289-
break;
290-
default:
291-
throw new IllegalArgumentException("Unknown type name: " + type);
266+
.build()) {
267+
268+
for (Object o : values) {
269+
switch (type) {
270+
case BOOLEAN:
271+
writer.write(message.newGroup().append("field", (Boolean) o));
272+
break;
273+
case INT32:
274+
writer.write(message.newGroup().append("field", (Integer) o));
275+
break;
276+
case INT64:
277+
writer.write(message.newGroup().append("field", (Long) o));
278+
break;
279+
case FLOAT:
280+
writer.write(message.newGroup().append("field", (Float) o));
281+
break;
282+
case DOUBLE:
283+
writer.write(message.newGroup().append("field", (Double) o));
284+
break;
285+
case INT96:
286+
case BINARY:
287+
case FIXED_LEN_BYTE_ARRAY:
288+
writer.write(message.newGroup().append("field", (Binary) o));
289+
break;
290+
default:
291+
throw new IllegalArgumentException("Unknown type name: " + type);
292+
}
292293
}
293294
}
294-
295-
writer.close();
296295
}
297296

298297
private List<?> generateRandomValues(PrimitiveTypeName type, int count) {
@@ -522,16 +521,17 @@ private static List<PageReadStore> readBlocksFromFile(Path file) throws IOExcept
522521

523522
ParquetMetadata metadata =
524523
ParquetFileReader.readFooter(configuration, file, ParquetMetadataConverter.NO_FILTER);
525-
ParquetFileReader fileReader = new ParquetFileReader(
524+
try (ParquetFileReader fileReader = new ParquetFileReader(
526525
configuration,
527526
metadata.getFileMetaData(),
528527
file,
529528
metadata.getBlocks(),
530-
metadata.getFileMetaData().getSchema().getColumns());
529+
metadata.getFileMetaData().getSchema().getColumns())) {
531530

532-
PageReadStore group;
533-
while ((group = fileReader.readNextRowGroup()) != null) {
534-
rowGroups.add(group);
531+
PageReadStore group;
532+
while ((group = fileReader.readNextRowGroup()) != null) {
533+
rowGroups.add(group);
534+
}
535535
}
536536

537537
return rowGroups;

parquet-hadoop/src/test/java/org/apache/parquet/filter2/TestFiltersWithMissingColumns.java

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -212,17 +212,13 @@ public void testOrMissingColumnFilter() throws Exception {
212212
}
213213

214214
public static long countFilteredRecords(Path path, FilterPredicate pred) throws IOException {
215-
ParquetReader<Group> reader = ParquetReader.builder(new GroupReadSupport(), path)
216-
.withFilter(FilterCompat.get(pred))
217-
.build();
218-
219215
long count = 0;
220-
try {
216+
try (ParquetReader<Group> reader = ParquetReader.builder(new GroupReadSupport(), path)
217+
.withFilter(FilterCompat.get(pred))
218+
.build()) {
221219
while (reader.read() != null) {
222220
count += 1;
223221
}
224-
} finally {
225-
reader.close();
226222
}
227223
return count;
228224
}

parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnChunkPageWriteStore.java

Lines changed: 28 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -215,36 +215,35 @@ public void test(Configuration config, ByteBufferAllocator allocator) throws Exc
215215

216216
{
217217
ParquetMetadata footer = ParquetFileReader.readFooter(conf, file, NO_FILTER);
218-
ParquetFileReader reader = new ParquetFileReader(
219-
config, footer.getFileMetaData(), file, footer.getBlocks(), schema.getColumns());
220-
PageReadStore rowGroup = reader.readNextRowGroup();
221-
PageReader pageReader = rowGroup.getPageReader(col);
222-
DataPageV2 page = (DataPageV2) pageReader.readPage();
223-
assertEquals(rowCount, page.getRowCount());
224-
assertEquals(nullCount, page.getNullCount());
225-
assertEquals(valueCount, page.getValueCount());
226-
assertEquals(d, intValue(page.getDefinitionLevels()));
227-
assertEquals(r, intValue(page.getRepetitionLevels()));
228-
assertEquals(dataEncoding, page.getDataEncoding());
229-
assertEquals(v, intValue(page.getData()));
218+
try (ParquetFileReader reader = new ParquetFileReader(
219+
config, footer.getFileMetaData(), file, footer.getBlocks(), schema.getColumns())) {
220+
PageReadStore rowGroup = reader.readNextRowGroup();
221+
PageReader pageReader = rowGroup.getPageReader(col);
222+
DataPageV2 page = (DataPageV2) pageReader.readPage();
223+
assertEquals(rowCount, page.getRowCount());
224+
assertEquals(nullCount, page.getNullCount());
225+
assertEquals(valueCount, page.getValueCount());
226+
assertEquals(d, intValue(page.getDefinitionLevels()));
227+
assertEquals(r, intValue(page.getRepetitionLevels()));
228+
assertEquals(dataEncoding, page.getDataEncoding());
229+
assertEquals(v, intValue(page.getData()));
230230

231-
// Checking column/offset indexes for the one page
232-
ColumnChunkMetaData column = footer.getBlocks().get(0).getColumns().get(0);
233-
ColumnIndex columnIndex = reader.readColumnIndex(column);
234-
assertArrayEquals(
235-
statistics.getMinBytes(), columnIndex.getMinValues().get(0).array());
236-
assertArrayEquals(
237-
statistics.getMaxBytes(), columnIndex.getMaxValues().get(0).array());
238-
assertEquals(
239-
statistics.getNumNulls(), columnIndex.getNullCounts().get(0).longValue());
240-
assertFalse(columnIndex.getNullPages().get(0));
241-
OffsetIndex offsetIndex = reader.readOffsetIndex(column);
242-
assertEquals(1, offsetIndex.getPageCount());
243-
assertEquals(pageSize, offsetIndex.getCompressedPageSize(0));
244-
assertEquals(0, offsetIndex.getFirstRowIndex(0));
245-
assertEquals(pageOffset, offsetIndex.getOffset(0));
246-
247-
reader.close();
231+
// Checking column/offset indexes for the one page
232+
ColumnChunkMetaData column = footer.getBlocks().get(0).getColumns().get(0);
233+
ColumnIndex columnIndex = reader.readColumnIndex(column);
234+
assertArrayEquals(
235+
statistics.getMinBytes(), columnIndex.getMinValues().get(0).array());
236+
assertArrayEquals(
237+
statistics.getMaxBytes(), columnIndex.getMaxValues().get(0).array());
238+
assertEquals(
239+
statistics.getNumNulls(), columnIndex.getNullCounts().get(0).longValue());
240+
assertFalse(columnIndex.getNullPages().get(0));
241+
OffsetIndex offsetIndex = reader.readOffsetIndex(column);
242+
assertEquals(1, offsetIndex.getPageCount());
243+
assertEquals(pageSize, offsetIndex.getCompressedPageSize(0));
244+
assertEquals(0, offsetIndex.getFirstRowIndex(0));
245+
assertEquals(pageOffset, offsetIndex.getOffset(0));
246+
}
248247
}
249248
}
250249

0 commit comments

Comments
 (0)