Skip to content

Commit 1df5a26

Browse files
Deepak Majetinongli
authored andcommitted
PARQUET-538: Improve ColumnReader Tests
closes apache#43 and closes apache#50 This PR also implements 1) PARQUET-532: Null values detection needs to be fixed and tested 2) PARQUET-502: Scanner segfaults when its batch size is smaller than the number of rows 3) PARQUET-526: Add more complete unit test coverage for column Scanner implementations 4) PARQUET-531: Can't read past first page in a column Author: Deepak Majeti <deepak.majeti@hp.com> Closes apache#62 from majetideepak/PARQUET-538 and squashes the following commits: 1e56f83 [Deepak Majeti] Trigger notification 6478a7c [Deepak Majeti] TYPED_TEST 1d14171 [Deepak Majeti] Added Boolean Test and Scanner:Next API d1da031 [Deepak Majeti] lint issue 45f10aa [Deepak Majeti] Reproducer for PARQUET-502 88e27c6 [Deepak Majeti] formatting 8aac435 [Deepak Majeti] PARQUET-526 dca7e2d [Deepak Majeti] PARQUET-532 and PARQUET-502 Fix a622021 [Deepak Majeti] Reverted PARQUET-524 and addressed comments 859c1df [Deepak Majeti] minor comment edits d938a13 [Deepak Majeti] PARQUET-538 df1fbd7 [Deepak Majeti] Templated single page tests 8548e3c [Deepak Majeti] PARQUET-524 c265fea [Deepak Majeti] fixed PARQUET-499 bugs
1 parent 35a48fb commit 1df5a26

11 files changed

Lines changed: 508 additions & 211 deletions

File tree

src/parquet/column/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,3 +25,4 @@ install(FILES
2525

2626
ADD_PARQUET_TEST(column-reader-test)
2727
ADD_PARQUET_TEST(levels-test)
28+
ADD_PARQUET_TEST(scanner-test)

src/parquet/column/column-reader-test.cc

Lines changed: 109 additions & 172 deletions
Original file line numberDiff line numberDiff line change
@@ -44,195 +44,132 @@ namespace test {
4444

4545
class TestPrimitiveReader : public ::testing::Test {
4646
public:
47-
void SetUp() {}
48-
49-
void TearDown() {}
47+
void MakePages(const ColumnDescriptor *d, int num_pages, int levels_per_page) {
48+
num_levels_ = levels_per_page * num_pages;
49+
num_values_ = 0;
50+
uint32_t seed = 0;
51+
int16_t zero = 0;
52+
vector<int> values_per_page(num_pages, levels_per_page);
53+
// Create definition levels
54+
if (max_def_level_ > 0) {
55+
def_levels_.resize(num_levels_);
56+
random_numbers(num_levels_, seed, zero, max_def_level_, def_levels_.data());
57+
for (int p = 0; p < num_pages; p++) {
58+
int num_values_per_page = 0;
59+
for (int i = 0; i < levels_per_page; i++) {
60+
if (def_levels_[i + p * levels_per_page] == max_def_level_) {
61+
num_values_per_page++;
62+
num_values_++;
63+
}
64+
}
65+
values_per_page[p] = num_values_per_page;
66+
}
67+
} else {
68+
num_values_ = num_levels_;
69+
}
70+
// Create repitition levels
71+
if (max_rep_level_ > 0) {
72+
rep_levels_.resize(num_levels_);
73+
random_numbers(num_levels_, seed, zero, max_rep_level_, rep_levels_.data());
74+
}
75+
// Create values
76+
values_.resize(num_values_);
77+
random_numbers(num_values_, seed, std::numeric_limits<int32_t>::min(),
78+
std::numeric_limits<int32_t>::max(), values_.data());
79+
Paginate<Type::INT32, int32_t>(d, values_, def_levels_, max_def_level_,
80+
rep_levels_, max_rep_level_, levels_per_page, values_per_page, pages_);
81+
}
5082

51-
void InitReader(const ColumnDescriptor* descr) {
83+
void InitReader(const ColumnDescriptor* d) {
84+
std::unique_ptr<PageReader> pager_;
5285
pager_.reset(new test::MockPageReader(pages_));
53-
reader_ = ColumnReader::Make(descr, std::move(pager_));
86+
reader_ = ColumnReader::Make(d, std::move(pager_));
87+
}
88+
89+
void CheckResults() {
90+
vector<int32_t> vresult(num_values_, -1);
91+
vector<int16_t> dresult(num_levels_, -1);
92+
vector<int16_t> rresult(num_levels_, -1);
93+
size_t values_read = 0;
94+
size_t total_values_read = 0;
95+
size_t batch_actual = 0;
96+
97+
Int32Reader* reader = static_cast<Int32Reader*>(reader_.get());
98+
int32_t batch_size = 8;
99+
size_t batch = 0;
100+
// This will cover both the cases
101+
// 1) batch_size < page_size (multiple ReadBatch from a single page)
102+
// 2) batch_size > page_size (BatchRead limits to a single page)
103+
do {
104+
batch = reader->ReadBatch(batch_size, &dresult[0] + batch_actual,
105+
&rresult[0] + batch_actual, &vresult[0] + total_values_read, &values_read);
106+
total_values_read += values_read;
107+
batch_actual += batch;
108+
batch_size = std::max(batch_size * 2, 4096);
109+
} while (batch > 0);
110+
111+
ASSERT_EQ(num_levels_, batch_actual);
112+
ASSERT_EQ(num_values_, total_values_read);
113+
ASSERT_TRUE(vector_equal(values_, vresult));
114+
if (max_def_level_ > 0) {
115+
ASSERT_TRUE(vector_equal(def_levels_, dresult));
116+
}
117+
if (max_rep_level_ > 0) {
118+
ASSERT_TRUE(vector_equal(rep_levels_, rresult));
119+
}
120+
// catch improper writes at EOS
121+
batch_actual = reader->ReadBatch(5, nullptr, nullptr, nullptr, &values_read);
122+
ASSERT_EQ(0, batch_actual);
123+
ASSERT_EQ(0, values_read);
124+
}
125+
126+
void execute(int num_pages, int levels_page, const ColumnDescriptor *d) {
127+
MakePages(d, num_pages, levels_page);
128+
InitReader(d);
129+
CheckResults();
54130
}
55131

56132
protected:
57-
std::shared_ptr<ColumnReader> reader_;
58-
std::unique_ptr<PageReader> pager_;
133+
int num_levels_;
134+
int num_values_;
135+
int16_t max_def_level_;
136+
int16_t max_rep_level_;
59137
vector<shared_ptr<Page> > pages_;
138+
std::shared_ptr<ColumnReader> reader_;
139+
vector<int32_t> values_;
140+
vector<int16_t> def_levels_;
141+
vector<int16_t> rep_levels_;
60142
};
61143

62-
63144
TEST_F(TestPrimitiveReader, TestInt32FlatRequired) {
64-
vector<int32_t> values = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
65-
66-
std::shared_ptr<DataPage> page = MakeDataPage<Type::INT32>(values, {}, 0,
67-
{}, 0);
68-
pages_.push_back(page);
69-
145+
int levels_per_page = 100;
146+
int num_pages = 50;
147+
max_def_level_ = 0;
148+
max_rep_level_ = 0;
70149
NodePtr type = schema::Int32("a", Repetition::REQUIRED);
71-
ColumnDescriptor descr(type, 0, 0);
72-
InitReader(&descr);
73-
74-
Int32Reader* reader = static_cast<Int32Reader*>(reader_.get());
75-
76-
vector<int32_t> result(10, -1);
77-
78-
size_t values_read = 0;
79-
size_t batch_actual = reader->ReadBatch(10, nullptr, nullptr,
80-
&result[0], &values_read);
81-
ASSERT_EQ(10, batch_actual);
82-
ASSERT_EQ(10, values_read);
83-
84-
ASSERT_TRUE(vector_equal(result, values));
150+
const ColumnDescriptor descr(type, max_def_level_, max_rep_level_);
151+
execute(num_pages, levels_per_page, &descr);
85152
}
86153

87-
88154
TEST_F(TestPrimitiveReader, TestInt32FlatOptional) {
89-
vector<int32_t> values = {1, 2, 3, 4, 5};
90-
vector<int16_t> def_levels = {1, 0, 0, 1, 1, 0, 0, 0, 1, 1};
91-
92-
std::shared_ptr<DataPage> page = MakeDataPage<Type::INT32>(values, def_levels, 1,
93-
{}, 0);
94-
95-
pages_.push_back(page);
96-
97-
NodePtr type = schema::Int32("a", Repetition::OPTIONAL);
98-
ColumnDescriptor descr(type, 1, 0);
99-
InitReader(&descr);
100-
101-
Int32Reader* reader = static_cast<Int32Reader*>(reader_.get());
102-
103-
size_t values_read = 0;
104-
size_t batch_actual = 0;
105-
106-
vector<int32_t> vresult(3, -1);
107-
vector<int16_t> dresult(5, -1);
108-
109-
batch_actual = reader->ReadBatch(5, &dresult[0], nullptr,
110-
&vresult[0], &values_read);
111-
ASSERT_EQ(5, batch_actual);
112-
ASSERT_EQ(3, values_read);
113-
114-
ASSERT_TRUE(vector_equal(vresult, slice(values, 0, 3)));
115-
ASSERT_TRUE(vector_equal(dresult, slice(def_levels, 0, 5)));
116-
117-
batch_actual = reader->ReadBatch(5, &dresult[0], nullptr,
118-
&vresult[0], &values_read);
119-
ASSERT_EQ(5, batch_actual);
120-
ASSERT_EQ(2, values_read);
121-
122-
ASSERT_TRUE(vector_equal(slice(vresult, 0, 2), slice(values, 3, 5)));
123-
ASSERT_TRUE(vector_equal(dresult, slice(def_levels, 5, 10)));
124-
125-
// EOS, pass all nullptrs to check for improper writes. Do not segfault /
126-
// core dump
127-
batch_actual = reader->ReadBatch(5, nullptr, nullptr,
128-
nullptr, &values_read);
129-
ASSERT_EQ(0, batch_actual);
130-
ASSERT_EQ(0, values_read);
155+
int levels_per_page = 100;
156+
int num_pages = 50;
157+
max_def_level_ = 4;
158+
max_rep_level_ = 0;
159+
NodePtr type = schema::Int32("b", Repetition::OPTIONAL);
160+
const ColumnDescriptor descr(type, max_def_level_, max_rep_level_);
161+
execute(num_pages, levels_per_page, &descr);
131162
}
132163

133164
TEST_F(TestPrimitiveReader, TestInt32FlatRepeated) {
134-
vector<int32_t> values = {1, 2, 3, 4, 5};
135-
vector<int16_t> def_levels = {2, 1, 1, 2, 2, 1, 1, 2, 2, 1};
136-
vector<int16_t> rep_levels = {0, 1, 1, 0, 0, 1, 1, 0, 0, 1};
137-
138-
std::shared_ptr<DataPage> page = MakeDataPage<Type::INT32>(values,
139-
def_levels, 2, rep_levels, 1);
140-
141-
pages_.push_back(page);
142-
143-
NodePtr type = schema::Int32("a", Repetition::REPEATED);
144-
ColumnDescriptor descr(type, 2, 1);
145-
InitReader(&descr);
146-
147-
Int32Reader* reader = static_cast<Int32Reader*>(reader_.get());
148-
149-
size_t values_read = 0;
150-
size_t batch_actual = 0;
151-
152-
vector<int32_t> vresult(3, -1);
153-
vector<int16_t> dresult(5, -1);
154-
vector<int16_t> rresult(5, -1);
155-
156-
batch_actual = reader->ReadBatch(5, &dresult[0], &rresult[0],
157-
&vresult[0], &values_read);
158-
ASSERT_EQ(5, batch_actual);
159-
ASSERT_EQ(3, values_read);
160-
161-
ASSERT_TRUE(vector_equal(vresult, slice(values, 0, 3)));
162-
ASSERT_TRUE(vector_equal(dresult, slice(def_levels, 0, 5)));
163-
ASSERT_TRUE(vector_equal(rresult, slice(rep_levels, 0, 5)));
164-
165-
batch_actual = reader->ReadBatch(5, &dresult[0], &rresult[0],
166-
&vresult[0], &values_read);
167-
ASSERT_EQ(5, batch_actual);
168-
ASSERT_EQ(2, values_read);
169-
170-
ASSERT_TRUE(vector_equal(slice(vresult, 0, 2), slice(values, 3, 5)));
171-
ASSERT_TRUE(vector_equal(dresult, slice(def_levels, 5, 10)));
172-
ASSERT_TRUE(vector_equal(rresult, slice(rep_levels, 5, 10)));
173-
174-
// EOS, pass all nullptrs to check for improper writes. Do not segfault /
175-
// core dump
176-
batch_actual = reader->ReadBatch(5, nullptr, nullptr,
177-
nullptr, &values_read);
178-
ASSERT_EQ(0, batch_actual);
179-
ASSERT_EQ(0, values_read);
165+
int levels_per_page = 100;
166+
int num_pages = 50;
167+
max_def_level_ = 4;
168+
max_rep_level_ = 2;
169+
NodePtr type = schema::Int32("c", Repetition::REPEATED);
170+
const ColumnDescriptor descr(type, max_def_level_, max_rep_level_);
171+
execute(num_pages, levels_per_page, &descr);
180172
}
181173

182-
TEST_F(TestPrimitiveReader, TestInt32FlatRepeatedMultiplePages) {
183-
vector<int32_t> values[2] = {{1, 2, 3, 4, 5},
184-
{6, 7, 8, 9, 10}};
185-
vector<int16_t> def_levels[2] = {{2, 1, 1, 2, 2, 1, 1, 2, 2, 1},
186-
{2, 2, 1, 2, 1, 1, 2, 1, 2, 1}};
187-
vector<int16_t> rep_levels[2] = {{0, 1, 1, 0, 0, 1, 1, 0, 0, 1},
188-
{0, 0, 1, 0, 1, 1, 0, 1, 0, 1}};
189-
190-
std::shared_ptr<DataPage> page;
191-
192-
for (int i = 0; i < 4; i++) {
193-
page = MakeDataPage<Type::INT32>(values[i % 2],
194-
def_levels[i % 2], 2, rep_levels[i % 2], 1);
195-
pages_.push_back(page);
196-
}
197-
198-
NodePtr type = schema::Int32("a", Repetition::REPEATED);
199-
ColumnDescriptor descr(type, 2, 1);
200-
InitReader(&descr);
201-
202-
Int32Reader* reader = static_cast<Int32Reader*>(reader_.get());
203-
204-
size_t values_read = 0;
205-
size_t batch_actual = 0;
206-
207-
vector<int32_t> vresult(3, -1);
208-
vector<int16_t> dresult(5, -1);
209-
vector<int16_t> rresult(5, -1);
210-
211-
for (int i = 0; i < 4; i++) {
212-
batch_actual = reader->ReadBatch(5, &dresult[0], &rresult[0],
213-
&vresult[0], &values_read);
214-
ASSERT_EQ(5, batch_actual);
215-
ASSERT_EQ(3, values_read);
216-
217-
ASSERT_TRUE(vector_equal(vresult, slice(values[i % 2], 0, 3)));
218-
ASSERT_TRUE(vector_equal(dresult, slice(def_levels[i % 2], 0, 5)));
219-
ASSERT_TRUE(vector_equal(rresult, slice(rep_levels[i % 2], 0, 5)));
220-
221-
batch_actual = reader->ReadBatch(5, &dresult[0], &rresult[0],
222-
&vresult[0], &values_read);
223-
ASSERT_EQ(5, batch_actual);
224-
ASSERT_EQ(2, values_read);
225-
226-
ASSERT_TRUE(vector_equal(slice(vresult, 0, 2), slice(values[i % 2], 3, 5)));
227-
ASSERT_TRUE(vector_equal(dresult, slice(def_levels[i % 2], 5, 10)));
228-
ASSERT_TRUE(vector_equal(rresult, slice(rep_levels[i % 2], 5, 10)));
229-
}
230-
// EOS, pass all nullptrs to check for improper writes. Do not segfault /
231-
// core dump
232-
batch_actual = reader->ReadBatch(5, nullptr, nullptr,
233-
nullptr, &values_read);
234-
ASSERT_EQ(0, batch_actual);
235-
ASSERT_EQ(0, values_read);
236-
}
237174
} // namespace test
238175
} // namespace parquet_cpp

src/parquet/column/reader.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ class TypedColumnReader : public ColumnReader {
122122
// This API is the same for both V1 and V2 of the DataPage
123123
//
124124
// @returns: actual number of levels read (see values_read for number of values read)
125-
size_t ReadBatch(int batch_size, int16_t* def_levels, int16_t* rep_levels,
125+
size_t ReadBatch(int32_t batch_size, int16_t* def_levels, int16_t* rep_levels,
126126
T* values, size_t* values_read);
127127

128128
private:

0 commit comments

Comments
 (0)