Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion c++/include/orc/MemoryPool.hh
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,15 @@ namespace orc {
uint64_t currentSize_;
// maximal capacity (actual allocated memory)
uint64_t currentCapacity_;
// whether this buffer owns memory lifecycle
bool ownBuffer_;

// not implemented
DataBuffer(DataBuffer& buffer);
DataBuffer& operator=(DataBuffer& buffer);

public:
DataBuffer(MemoryPool& pool, uint64_t size = 0);
DataBuffer(MemoryPool& pool, uint64_t size = 0, bool ownBuf = true);

Comment thread
lucasfang marked this conversation as resolved.
DataBuffer(DataBuffer<T>&& buffer) noexcept;

Expand Down Expand Up @@ -81,6 +83,9 @@ namespace orc {
void reserve(uint64_t size);
void resize(uint64_t size);
void zeroOut();

// Attach to an external raw buffer. bufSize is in bytes.
void setData(T* buf, size_t bufSize);
};

// Specializations for char
Expand Down
88 changes: 73 additions & 15 deletions c++/src/MemoryPool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include <string.h>
#include <cstdlib>
#include <iostream>
#include <type_traits>

namespace orc {

Expand Down Expand Up @@ -52,35 +53,47 @@ namespace orc {
}

template <class T>
DataBuffer<T>::DataBuffer(MemoryPool& pool, uint64_t newSize)
: memoryPool_(pool), buf_(nullptr), currentSize_(0), currentCapacity_(0) {
reserve(newSize);
currentSize_ = newSize;
DataBuffer<T>::DataBuffer(MemoryPool& pool, uint64_t newSize, bool ownBuf)
: memoryPool_(pool), buf_(nullptr), currentSize_(0), currentCapacity_(0), ownBuffer_(ownBuf) {
if (ownBuffer_) {
reserve(newSize);
currentSize_ = newSize;
}
}

template <class T>
DataBuffer<T>::DataBuffer(DataBuffer<T>&& buffer) noexcept
: memoryPool_(buffer.memoryPool_),
buf_(buffer.buf_),
currentSize_(buffer.currentSize_),
currentCapacity_(buffer.currentCapacity_) {
currentCapacity_(buffer.currentCapacity_),
ownBuffer_(buffer.ownBuffer_) {
buffer.buf_ = nullptr;
buffer.currentSize_ = 0;
buffer.currentCapacity_ = 0;
buffer.ownBuffer_ = true;
}

template <class T>
DataBuffer<T>::~DataBuffer() {
if (!ownBuffer_) {
return;
}
for (uint64_t i = currentSize_; i > 0; --i) {
(buf_ + i - 1)->~T();
}
if (buf_) {
static_assert(std::is_trivially_copyable<T>::value,
"Only trivially copyable type is supported for DataBuffer reserve");
memoryPool_.free(reinterpret_cast<char*>(buf_));
}
Comment thread
lucasfang marked this conversation as resolved.
}

template <class T>
void DataBuffer<T>::resize(uint64_t newSize) {
if (!ownBuffer_) {
return;
}
reserve(newSize);
if (currentSize_ > newSize) {
for (uint64_t i = currentSize_; i > newSize; --i) {
Expand All @@ -96,6 +109,9 @@ namespace orc {

template <class T>
void DataBuffer<T>::reserve(uint64_t newCapacity) {
if (!ownBuffer_) {
return;
}
if (newCapacity > currentCapacity_ || !buf_) {
if (buf_) {
T* buf_old = buf_;
Expand All @@ -114,6 +130,18 @@ namespace orc {
memset(buf_, 0, sizeof(T) * currentCapacity_);
}

template <class T>
void DataBuffer<T>::setData(T* buffer, size_t bufSize) {
if (ownBuffer_ && buf_) {
static_assert(std::is_trivially_copyable<T>::value,
"Only trivially copyable type is supported for DataBuffer reserve");
memoryPool_.free(reinterpret_cast<char*>(buf_));
}
ownBuffer_ = false;
buf_ = buffer;
currentSize_ = currentCapacity_ = static_cast<uint64_t>(bufSize / sizeof(T));
}
Comment thread
lucasfang marked this conversation as resolved.

// Specializations for Int128
template <>
void DataBuffer<Int128>::zeroOut() {
Expand All @@ -126,13 +154,16 @@ namespace orc {

template <>
DataBuffer<char>::~DataBuffer() {
if (buf_) {
if (ownBuffer_ && buf_) {
memoryPool_.free(reinterpret_cast<char*>(buf_));
}
}

template <>
void DataBuffer<char>::resize(uint64_t newSize) {
if (!ownBuffer_) {
return;
}
reserve(newSize);
if (newSize > currentSize_) {
memset(buf_ + currentSize_, 0, newSize - currentSize_);
Expand All @@ -144,13 +175,16 @@ namespace orc {

template <>
DataBuffer<char*>::~DataBuffer() {
if (buf_) {
if (ownBuffer_ && buf_) {
memoryPool_.free(reinterpret_cast<char*>(buf_));
}
}

template <>
void DataBuffer<char*>::resize(uint64_t newSize) {
if (!ownBuffer_) {
return;
}
reserve(newSize);
if (newSize > currentSize_) {
memset(buf_ + currentSize_, 0, (newSize - currentSize_) * sizeof(char*));
Expand All @@ -162,13 +196,16 @@ namespace orc {

template <>
DataBuffer<double>::~DataBuffer() {
if (buf_) {
if (ownBuffer_ && buf_) {
memoryPool_.free(reinterpret_cast<char*>(buf_));
}
}

template <>
void DataBuffer<double>::resize(uint64_t newSize) {
if (!ownBuffer_) {
return;
}
reserve(newSize);
if (newSize > currentSize_) {
memset(buf_ + currentSize_, 0, (newSize - currentSize_) * sizeof(double));
Expand All @@ -180,13 +217,16 @@ namespace orc {

template <>
DataBuffer<float>::~DataBuffer() {
if (buf_) {
if (ownBuffer_ && buf_) {
memoryPool_.free(reinterpret_cast<char*>(buf_));
}
}

template <>
void DataBuffer<float>::resize(uint64_t newSize) {
if (!ownBuffer_) {
return;
}
reserve(newSize);
if (newSize > currentSize_) {
memset(buf_ + currentSize_, 0, (newSize - currentSize_) * sizeof(float));
Expand All @@ -198,13 +238,16 @@ namespace orc {

template <>
DataBuffer<int64_t>::~DataBuffer() {
if (buf_) {
if (ownBuffer_ && buf_) {
memoryPool_.free(reinterpret_cast<char*>(buf_));
}
}

template <>
void DataBuffer<int64_t>::resize(uint64_t newSize) {
if (!ownBuffer_) {
return;
}
reserve(newSize);
if (newSize > currentSize_) {
memset(buf_ + currentSize_, 0, (newSize - currentSize_) * sizeof(int64_t));
Expand All @@ -216,13 +259,16 @@ namespace orc {

template <>
DataBuffer<int32_t>::~DataBuffer() {
if (buf_) {
if (ownBuffer_ && buf_) {
memoryPool_.free(reinterpret_cast<char*>(buf_));
}
}

template <>
void DataBuffer<int32_t>::resize(uint64_t newSize) {
if (!ownBuffer_) {
return;
}
reserve(newSize);
if (newSize > currentSize_) {
memset(buf_ + currentSize_, 0, (newSize - currentSize_) * sizeof(int32_t));
Expand All @@ -234,13 +280,16 @@ namespace orc {

template <>
DataBuffer<int16_t>::~DataBuffer() {
if (buf_) {
if (ownBuffer_ && buf_) {
memoryPool_.free(reinterpret_cast<char*>(buf_));
}
}

template <>
void DataBuffer<int16_t>::resize(uint64_t newSize) {
if (!ownBuffer_) {
return;
}
reserve(newSize);
if (newSize > currentSize_) {
memset(buf_ + currentSize_, 0, (newSize - currentSize_) * sizeof(int16_t));
Expand All @@ -252,13 +301,16 @@ namespace orc {

template <>
DataBuffer<int8_t>::~DataBuffer() {
if (buf_) {
if (ownBuffer_ && buf_) {
memoryPool_.free(reinterpret_cast<char*>(buf_));
}
}

template <>
void DataBuffer<int8_t>::resize(uint64_t newSize) {
if (!ownBuffer_) {
return;
}
reserve(newSize);
if (newSize > currentSize_) {
memset(buf_ + currentSize_, 0, (newSize - currentSize_) * sizeof(int8_t));
Expand All @@ -270,13 +322,16 @@ namespace orc {

template <>
DataBuffer<uint64_t>::~DataBuffer() {
if (buf_) {
if (ownBuffer_ && buf_) {
memoryPool_.free(reinterpret_cast<char*>(buf_));
}
}

template <>
void DataBuffer<uint64_t>::resize(uint64_t newSize) {
if (!ownBuffer_) {
return;
}
reserve(newSize);
if (newSize > currentSize_) {
memset(buf_ + currentSize_, 0, (newSize - currentSize_) * sizeof(uint64_t));
Expand All @@ -288,13 +343,16 @@ namespace orc {

template <>
DataBuffer<unsigned char>::~DataBuffer() {
if (buf_) {
if (ownBuffer_ && buf_) {
memoryPool_.free(reinterpret_cast<char*>(buf_));
}
}

template <>
void DataBuffer<unsigned char>::resize(uint64_t newSize) {
if (!ownBuffer_) {
return;
}
reserve(newSize);
if (newSize > currentSize_) {
memset(buf_ + currentSize_, 0, newSize - currentSize_);
Expand Down
37 changes: 37 additions & 0 deletions c++/test/TestCache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,22 @@

namespace orc {

class CountingMemoryPool : public MemoryPool {
public:
uint64_t allocCount = 0;
uint64_t freeCount = 0;

char* malloc(uint64_t size) override {
++allocCount;
return static_cast<char*>(std::malloc(size));
}

void free(char* p) override {
++freeCount;
std::free(p);
}
Comment thread
lucasfang marked this conversation as resolved.
};

TEST(TestReadRangeCombiner, testBasics) {
ReadRangeCombiner combinator{0, 100};
/// Ranges with partial overlap and identical offsets
Expand Down Expand Up @@ -139,4 +155,25 @@ namespace orc {
slice = cache.read({20, 2});
assert_slice_equal(slice, "uv");
}

TEST(TestDataBuffer, testExternalBufferNonOwning) {
CountingMemoryPool pool;
char external[16] = {0};
uint64_t freeCountAfterSetData = 0;

{
DataBuffer<char> buffer(pool, 0);
buffer.setData(external, sizeof(external));
freeCountAfterSetData = pool.freeCount;

// Non-owning buffer should keep external size and ignore resize.
EXPECT_EQ(sizeof(external), buffer.size());
buffer.resize(8);
EXPECT_EQ(sizeof(external), buffer.size());
}

// setData may release previously owned internal memory, but destruction should not free
// external.
EXPECT_EQ(freeCountAfterSetData, pool.freeCount);
}
} // namespace orc
Loading