Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions c_glib/parquet-glib/arrow-file-reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -212,10 +212,11 @@ GArrowTable *
gparquet_arrow_file_reader_read_table(GParquetArrowFileReader *reader, GError **error)
{
auto parquet_arrow_file_reader = gparquet_arrow_file_reader_get_raw(reader);
std::shared_ptr<arrow::Table> arrow_table;
auto status = parquet_arrow_file_reader->ReadTable(&arrow_table);
if (garrow_error_check(error, status, "[parquet][arrow][file-reader][read-table]")) {
return garrow_table_new_raw(&arrow_table);
auto arrow_table_result = parquet_arrow_file_reader->ReadTable();
if (garrow::check(error,
arrow_table_result,
"[parquet][arrow][file-reader][read-table]")) {
return garrow_table_new_raw(&(*arrow_table_result));
} else {
return NULL;
}
Expand Down
2 changes: 1 addition & 1 deletion cpp/examples/arrow/parquet_read_write.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ arrow::Status ReadFullFile(std::string path_to_file) {

// Read entire file as a single Arrow table
std::shared_ptr<arrow::Table> table;
ARROW_RETURN_NOT_OK(arrow_reader->ReadTable(&table));
ARROW_ASSIGN_OR_RAISE(table, arrow_reader->ReadTable());
return arrow::Status::OK();
}

Expand Down
2 changes: 1 addition & 1 deletion cpp/examples/parquet/parquet_arrow/reader_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ void read_whole_file() {
PARQUET_ASSIGN_OR_THROW(reader,
parquet::arrow::OpenFile(infile, arrow::default_memory_pool()));
std::shared_ptr<arrow::Table> table;
PARQUET_THROW_NOT_OK(reader->ReadTable(&table));
PARQUET_ASSIGN_OR_THROW(table, reader->ReadTable());
std::cout << "Loaded " << table->num_rows() << " rows in " << table->num_columns()
<< " columns." << std::endl;
}
Expand Down
4 changes: 2 additions & 2 deletions cpp/examples/tutorial_examples/file_access_example.cc
Original file line number Diff line number Diff line change
Expand Up @@ -185,9 +185,9 @@ arrow::Status RunMain() {
// (Doc section: Parquet OpenFile)

// (Doc section: Parquet Read)
std::shared_ptr<arrow::Table> parquet_table;
// Read the table.
PARQUET_THROW_NOT_OK(reader->ReadTable(&parquet_table));
std::shared_ptr<arrow::Table> parquet_table;
PARQUET_ASSIGN_OR_THROW(parquet_table, reader->ReadTable());
// (Doc section: Parquet Read)

// (Doc section: Parquet Write)
Expand Down
3 changes: 1 addition & 2 deletions cpp/src/arrow/dataset/file_parquet_encryption_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -397,8 +397,7 @@ TEST_P(DatasetEncryptionTest, ReadSingleFile) {
parquet::arrow::FileReaderBuilder reader_builder;
ASSERT_OK(reader_builder.Open(input, reader_properties));
ASSERT_OK_AND_ASSIGN(auto arrow_reader, reader_builder.Build());
std::shared_ptr<Table> table;
ASSERT_OK(arrow_reader->ReadTable(&table));
ASSERT_OK_AND_ASSIGN(auto table, arrow_reader->ReadTable());

// Check the contents of the table
ASSERT_EQ(table->num_rows(), 2);
Expand Down
3 changes: 1 addition & 2 deletions cpp/src/arrow/filesystem/s3fs_benchmark.cc
Original file line number Diff line number Diff line change
Expand Up @@ -315,8 +315,7 @@ static void ParquetRead(benchmark::State& st, S3FileSystem* fs, const std::strin
ASSERT_OK(builder.properties(properties)->Build(&reader));

if (read_strategy == "ReadTable") {
std::shared_ptr<Table> table;
ASSERT_OK(reader->ReadTable(column_indices, &table));
ASSERT_OK_AND_ASSIGN(auto table, reader->ReadTable(column_indices));
} else {
ASSERT_OK_AND_ASSIGN(auto rb_reader, reader->GetRecordBatchReader(
std::vector<int>{0}, column_indices));
Expand Down
60 changes: 24 additions & 36 deletions cpp/src/parquet/arrow/arrow_reader_writer_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -439,7 +439,7 @@ void DoRoundtrip(const std::shared_ptr<Table>& table, int64_t row_group_size,
FileReaderBuilder builder;
ASSERT_OK_NO_THROW(builder.Open(std::make_shared<BufferReader>(buffer)));
ASSERT_OK(builder.properties(arrow_reader_properties)->Build(&reader));
ASSERT_OK_NO_THROW(reader->ReadTable(out));
ASSERT_OK_AND_ASSIGN(*out, reader->ReadTable());
}

void CheckConfiguredRoundtrip(
Expand Down Expand Up @@ -486,10 +486,10 @@ void DoSimpleRoundtrip(const std::shared_ptr<Table>& table, bool use_threads,

reader->set_use_threads(use_threads);
if (column_subset.size() > 0) {
ASSERT_OK_NO_THROW(reader->ReadTable(column_subset, out));
ASSERT_OK_AND_ASSIGN(*out, reader->ReadTable(column_subset));
} else {
// Read everything
ASSERT_OK_NO_THROW(reader->ReadTable(out));
ASSERT_OK_AND_ASSIGN(*out, reader->ReadTable());
}
}

Expand Down Expand Up @@ -709,7 +709,7 @@ class ParquetIOTestBase : public ::testing::Test {

void ReadTableFromFile(std::unique_ptr<FileReader> reader, bool expect_metadata,
std::shared_ptr<Table>* out) {
ASSERT_OK_NO_THROW(reader->ReadTable(out));
ASSERT_OK_AND_ASSIGN(*out, reader->ReadTable());
auto key_value_metadata =
reader->parquet_reader()->metadata()->key_value_metadata().get();
if (!expect_metadata) {
Expand Down Expand Up @@ -2801,8 +2801,7 @@ TEST(TestArrowReadWrite, ReadCoalescedColumnSubset) {
{0, 4, 8, 10}, {0, 1, 2, 3}, {5, 17, 18, 19}};

for (std::vector<int>& column_subset : column_subsets) {
std::shared_ptr<Table> result;
ASSERT_OK(reader->ReadTable(column_subset, &result));
ASSERT_OK_AND_ASSIGN(auto result, reader->ReadTable(column_subset));

std::vector<std::shared_ptr<::arrow::ChunkedArray>> ex_columns;
std::vector<std::shared_ptr<::arrow::Field>> ex_fields;
Expand Down Expand Up @@ -2839,8 +2838,7 @@ TEST(TestArrowReadWrite, ListLargeRecords) {
::arrow::default_memory_pool()));

// Read everything
std::shared_ptr<Table> result;
ASSERT_OK_NO_THROW(reader->ReadTable(&result));
ASSERT_OK_AND_ASSIGN(auto result, reader->ReadTable());
ASSERT_NO_FATAL_FAILURE(::arrow::AssertTablesEqual(*table, *result));

// Read 1 record at a time
Expand Down Expand Up @@ -3579,8 +3577,7 @@ void DoNestedValidate(const std::shared_ptr<::arrow::DataType>& inner_type,
ASSERT_OK(reader_builder.Build(&reader));
ARROW_SCOPED_TRACE("Parquet schema: ",
reader->parquet_reader()->metadata()->schema()->ToString());
std::shared_ptr<Table> result;
ASSERT_OK_NO_THROW(reader->ReadTable(&result));
ASSERT_OK_AND_ASSIGN(auto result, reader->ReadTable());

if (inner_type->id() == ::arrow::Type::DATE64 ||
inner_type->id() == ::arrow::Type::TIMESTAMP ||
Expand Down Expand Up @@ -4036,8 +4033,7 @@ class TestNestedSchemaRead : public ::testing::TestWithParam<Repetition::type> {
TEST_F(TestNestedSchemaRead, ReadIntoTableFull) {
ASSERT_NO_FATAL_FAILURE(CreateSimpleNestedParquet(Repetition::OPTIONAL));

std::shared_ptr<Table> table;
ASSERT_OK_NO_THROW(reader_->ReadTable(&table));
ASSERT_OK_AND_ASSIGN(auto table, reader_->ReadTable());
ASSERT_EQ(table->num_rows(), NUM_SIMPLE_TEST_ROWS);
ASSERT_EQ(table->num_columns(), 2);
ASSERT_EQ(table->schema()->field(0)->type()->num_fields(), 2);
Expand Down Expand Up @@ -4080,7 +4076,7 @@ TEST_F(TestNestedSchemaRead, ReadTablePartial) {
std::shared_ptr<Table> table;

// columns: {group1.leaf1, leaf3}
ASSERT_OK_NO_THROW(reader_->ReadTable({0, 2}, &table));
ASSERT_OK_AND_ASSIGN(table, reader_->ReadTable({0, 2}));
ASSERT_EQ(table->num_rows(), NUM_SIMPLE_TEST_ROWS);
ASSERT_EQ(table->num_columns(), 2);
ASSERT_EQ(table->schema()->field(0)->name(), "group1");
Expand All @@ -4098,23 +4094,23 @@ TEST_F(TestNestedSchemaRead, ReadTablePartial) {
ASSERT_NO_FATAL_FAILURE(ValidateTableArrayTypes(*table));

// columns: {group1.leaf1, group1.leaf2}
ASSERT_OK_NO_THROW(reader_->ReadTable({0, 1}, &table));
ASSERT_OK_AND_ASSIGN(table, reader_->ReadTable({0, 1}));
ASSERT_EQ(table->num_rows(), NUM_SIMPLE_TEST_ROWS);
ASSERT_EQ(table->num_columns(), 1);
ASSERT_EQ(table->schema()->field(0)->name(), "group1");
ASSERT_EQ(table->schema()->field(0)->type()->num_fields(), 2);
ASSERT_NO_FATAL_FAILURE(ValidateTableArrayTypes(*table));

// columns: {leaf3}
ASSERT_OK_NO_THROW(reader_->ReadTable({2}, &table));
ASSERT_OK_AND_ASSIGN(table, reader_->ReadTable({2}));
ASSERT_EQ(table->num_rows(), NUM_SIMPLE_TEST_ROWS);
ASSERT_EQ(table->num_columns(), 1);
ASSERT_EQ(table->schema()->field(0)->name(), "leaf3");
ASSERT_EQ(table->schema()->field(0)->type()->num_fields(), 0);
ASSERT_NO_FATAL_FAILURE(ValidateTableArrayTypes(*table));

// Test with different ordering
ASSERT_OK_NO_THROW(reader_->ReadTable({2, 0}, &table));
ASSERT_OK_AND_ASSIGN(table, reader_->ReadTable({2, 0}));
ASSERT_EQ(table->num_rows(), NUM_SIMPLE_TEST_ROWS);
ASSERT_EQ(table->num_columns(), 2);
ASSERT_EQ(table->schema()->field(0)->name(), "leaf3");
Expand All @@ -4135,8 +4131,7 @@ TEST_P(TestNestedSchemaRead, DeepNestedSchemaRead) {
int num_rows = SMALL_SIZE * (depth + 2);
ASSERT_NO_FATAL_FAILURE(CreateMultiLevelNestedParquet(num_trees, depth, num_children,
num_rows, GetParam()));
std::shared_ptr<Table> table;
ASSERT_OK_NO_THROW(reader_->ReadTable(&table));
ASSERT_OK_AND_ASSIGN(auto table, reader_->ReadTable());
ASSERT_EQ(table->num_columns(), num_trees);
ASSERT_EQ(table->num_rows(), num_rows);

Expand Down Expand Up @@ -4184,8 +4179,8 @@ void TryReadDataFile(const std::string& path,
Status s;
auto reader_result = FileReader::Make(pool, ParquetFileReader::OpenFile(path, false));
if (reader_result.ok()) {
std::shared_ptr<::arrow::Table> table;
s = (*reader_result)->ReadTable(&table);
auto table_result = (*reader_result)->ReadTable();
s = table_result.status();
} else {
s = reader_result.status();
}
Expand Down Expand Up @@ -4261,7 +4256,7 @@ TEST(TestArrowReaderAdHoc, LARGE_MEMORY_TEST(LargeStringColumn)) {
auto reader = ParquetFileReader::Open(std::make_shared<BufferReader>(tables_buffer));
ASSERT_OK_AND_ASSIGN(auto arrow_reader,
FileReader::Make(default_memory_pool(), std::move(reader)));
ASSERT_OK_NO_THROW(arrow_reader->ReadTable(&table));
ASSERT_OK_AND_ASSIGN(table, arrow_reader->ReadTable());
ASSERT_OK(table->ValidateFull());

// ARROW-9297: ensure RecordBatchReader also works
Expand Down Expand Up @@ -4365,8 +4360,7 @@ TEST(TestArrowReaderAdHoc, LegacyTwoLevelList) {
// Verify Arrow schema and data
ASSERT_OK_AND_ASSIGN(auto reader,
FileReader::Make(default_memory_pool(), std::move(file_reader)));
std::shared_ptr<Table> table;
ASSERT_OK(reader->ReadTable(&table));
ASSERT_OK_AND_ASSIGN(auto table, reader->ReadTable());
ASSERT_OK(table->ValidateFull());
AssertTablesEqual(*expected_table, *table);
};
Expand Down Expand Up @@ -4429,8 +4423,7 @@ TEST_P(TestArrowReaderAdHocSparkAndHvr, ReadDecimals) {

ASSERT_OK_AND_ASSIGN(auto arrow_reader,
FileReader::Make(pool, ParquetFileReader::OpenFile(path, false)));
std::shared_ptr<::arrow::Table> table;
ASSERT_OK_NO_THROW(arrow_reader->ReadTable(&table));
ASSERT_OK_AND_ASSIGN(auto table, arrow_reader->ReadTable());

std::shared_ptr<::arrow::Schema> schema;
ASSERT_OK_NO_THROW(arrow_reader->GetSchema(&schema));
Expand Down Expand Up @@ -4495,8 +4488,7 @@ TEST(TestArrowReaderAdHoc, ReadFloat16Files) {

ASSERT_OK_AND_ASSIGN(
auto reader, FileReader::Make(pool, ParquetFileReader::OpenFile(path, false)));
std::shared_ptr<::arrow::Table> table;
ASSERT_OK_NO_THROW(reader->ReadTable(&table));
ASSERT_OK_AND_ASSIGN(auto table, reader->ReadTable());

std::shared_ptr<::arrow::Schema> schema;
ASSERT_OK_NO_THROW(reader->GetSchema(&schema));
Expand Down Expand Up @@ -4907,8 +4899,7 @@ class TestArrowReadDictionary : public ::testing::TestWithParam<double> {
void CheckReadWholeFile(const Table& expected) {
ASSERT_OK_AND_ASSIGN(auto reader, GetReader());

std::shared_ptr<Table> actual;
ASSERT_OK_NO_THROW(reader->ReadTable(&actual));
ASSERT_OK_AND_ASSIGN(auto actual, reader->ReadTable());
::arrow::AssertTablesEqual(expected, *actual, /*same_chunk_layout=*/false);
}

Expand Down Expand Up @@ -5005,8 +4996,7 @@ TEST_P(TestArrowReadDictionary, IncrementalReads) {

// Read in one shot
ASSERT_OK_AND_ASSIGN(std::unique_ptr<FileReader> reader, GetReader());
std::shared_ptr<Table> expected;
ASSERT_OK_NO_THROW(reader->ReadTable(&expected));
ASSERT_OK_AND_ASSIGN(auto expected, reader->ReadTable());

ASSERT_OK_AND_ASSIGN(reader, GetReader());
std::unique_ptr<ColumnReader> col;
Expand Down Expand Up @@ -5138,7 +5128,7 @@ class TestArrowReadDeltaEncoding : public ::testing::Test {
ASSERT_OK_AND_ASSIGN(
auto parquet_reader,
FileReader::Make(pool, ParquetFileReader::OpenFile(file, false)));
ASSERT_OK(parquet_reader->ReadTable(out));
ASSERT_OK_AND_ASSIGN(*out, parquet_reader->ReadTable());
ASSERT_OK((*out)->ValidateFull());
}

Expand Down Expand Up @@ -5340,8 +5330,7 @@ TEST_P(TestNestedSchemaFilteredReader, ReadWrite) {
FileReaderBuilder builder;
ASSERT_OK_NO_THROW(builder.Open(std::make_shared<BufferReader>(buffer)));
ASSERT_OK(builder.properties(default_arrow_reader_properties())->Build(&reader));
std::shared_ptr<::arrow::Table> read_table;
ASSERT_OK_NO_THROW(reader->ReadTable(GetParam().indices_to_read, &read_table));
ASSERT_OK_AND_ASSIGN(auto read_table, reader->ReadTable(GetParam().indices_to_read));

std::shared_ptr<::arrow::Array> expected =
ArrayFromJSON(GetParam().expected_schema, GetParam().read_data);
Expand Down Expand Up @@ -5817,10 +5806,9 @@ TEST(TestArrowReadWrite, MultithreadedWrite) {
ASSERT_OK_AND_ASSIGN(auto buffer, sink->Finish());

// Read to verify the data.
std::shared_ptr<Table> result;
ASSERT_OK_AND_ASSIGN(auto reader,
OpenFile(std::make_shared<BufferReader>(buffer), pool));
ASSERT_OK_NO_THROW(reader->ReadTable(&result));
ASSERT_OK_AND_ASSIGN(auto result, reader->ReadTable());
ASSERT_NO_FATAL_FAILURE(::arrow::AssertTablesEqual(*table, *result));
}

Expand Down
24 changes: 19 additions & 5 deletions cpp/src/parquet/arrow/reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -202,9 +202,12 @@ class FileReaderImpl : public FileReader {

std::shared_ptr<RowGroupReader> RowGroup(int row_group_index) override;

Status ReadTable(const std::vector<int>& indices,
std::shared_ptr<Table>* out) override {
return ReadRowGroups(Iota(reader_->metadata()->num_row_groups()), indices, out);
Result<std::shared_ptr<Table>> ReadTable(
const std::vector<int>& column_indices) override {
std::shared_ptr<Table> table;
RETURN_NOT_OK(ReadRowGroups(Iota(reader_->metadata()->num_row_groups()),
column_indices, &table));
Comment on lines +208 to +209
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh... We need to add arrow::Result version of ReadRowGroups()...

Let's work on it as a separated task. Could you open an issue for it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I'll open a new issue for the ReadRowGroups refactoring. I'll get to the code updates soon.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return table;
}

Status GetFieldReader(int i,
Expand Down Expand Up @@ -305,8 +308,8 @@ class FileReaderImpl : public FileReader {
return ReadColumn(i, Iota(reader_->metadata()->num_row_groups()), out);
}

Status ReadTable(std::shared_ptr<Table>* table) override {
return ReadTable(Iota(reader_->metadata()->num_columns()), table);
Result<std::shared_ptr<Table>> ReadTable() override {
return ReadTable(Iota(reader_->metadata()->num_columns()));
}

Status ReadRowGroups(const std::vector<int>& row_groups,
Expand Down Expand Up @@ -1339,6 +1342,17 @@ Status FileReader::GetRecordBatchReader(const std::vector<int>& row_group_indice
return Status::OK();
}

Status FileReader::ReadTable(std::shared_ptr<Table>* out) {
ARROW_ASSIGN_OR_RAISE(*out, ReadTable());
return Status::OK();
}

Status FileReader::ReadTable(const std::vector<int>& column_indices,
std::shared_ptr<Table>* out) {
ARROW_ASSIGN_OR_RAISE(*out, ReadTable(column_indices));
return Status::OK();
}

Status FileReader::Make(::arrow::MemoryPool* pool,
std::unique_ptr<ParquetFileReader> reader,
const ArrowReaderProperties& properties,
Expand Down
15 changes: 12 additions & 3 deletions cpp/src/parquet/arrow/reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,11 @@ class PARQUET_EXPORT FileReader {
int64_t rows_to_readahead = 0) = 0;

/// Read all columns into a Table
virtual ::arrow::Status ReadTable(std::shared_ptr<::arrow::Table>* out) = 0;
virtual ::arrow::Result<std::shared_ptr<::arrow::Table>> ReadTable() = 0;

/// \deprecated Deprecated in 24.0.0. Use arrow::Result version instead.
ARROW_DEPRECATED("Deprecated in 24.0.0. Use arrow::Result version instead.")
::arrow::Status ReadTable(std::shared_ptr<::arrow::Table>* out);

/// \brief Read the given columns into a Table
///
Expand All @@ -254,8 +258,13 @@ class PARQUET_EXPORT FileReader {
/// manifest().schema_fields to get the top level fields, and then walk the
/// tree to identify the relevant leaf fields and access its column_index.
/// To get the total number of leaf fields, use FileMetadata.num_columns().
virtual ::arrow::Status ReadTable(const std::vector<int>& column_indices,
std::shared_ptr<::arrow::Table>* out) = 0;
virtual ::arrow::Result<std::shared_ptr<::arrow::Table>> ReadTable(
const std::vector<int>& column_indices) = 0;

/// \deprecated Deprecated in 24.0.0. Use arrow::Result version instead.
ARROW_DEPRECATED("Deprecated in 24.0.0. Use arrow::Result version instead.")
::arrow::Status ReadTable(const std::vector<int>& column_indices,
std::shared_ptr<::arrow::Table>* out);

virtual ::arrow::Status ReadRowGroup(int i, const std::vector<int>& column_indices,
std::shared_ptr<::arrow::Table>* out) = 0;
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/parquet/arrow/reader_writer_benchmark.cc
Original file line number Diff line number Diff line change
Expand Up @@ -301,8 +301,8 @@ static void BenchmarkReadTable(::benchmark::State& state, const Table& table,
EXIT_NOT_OK(arrow_reader_result.status());
auto arrow_reader = std::move(*arrow_reader_result);

std::shared_ptr<Table> table;
EXIT_NOT_OK(arrow_reader->ReadTable(&table));
auto table_result = arrow_reader->ReadTable();
EXIT_NOT_OK(table_result.status());
}

if (num_values == -1) {
Expand Down
3 changes: 1 addition & 2 deletions cpp/src/parquet/chunker_internal_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,6 @@ Result<std::shared_ptr<Table>> ConcatAndCombine(
}

Result<std::shared_ptr<Table>> ReadTableFromBuffer(const std::shared_ptr<Buffer>& data) {
std::shared_ptr<Table> result;
FileReaderBuilder builder;
std::unique_ptr<FileReader> reader;
auto props = default_arrow_reader_properties();
Expand All @@ -329,7 +328,7 @@ Result<std::shared_ptr<Table>> ReadTableFromBuffer(const std::shared_ptr<Buffer>
RETURN_NOT_OK(builder.memory_pool(::arrow::default_memory_pool())
->properties(props)
->Build(&reader));
RETURN_NOT_OK(reader->ReadTable(&result));
ARROW_ASSIGN_OR_RAISE(auto result, reader->ReadTable());
return result;
}

Expand Down
Loading
Loading