diff --git a/cpp/src/arrow/adapters/orc/adapter.cc b/cpp/src/arrow/adapters/orc/adapter.cc index 51cca497485..cf487f07144 100644 --- a/cpp/src/arrow/adapters/orc/adapter.cc +++ b/cpp/src/arrow/adapters/orc/adapter.cc @@ -18,6 +18,7 @@ #include "arrow/adapters/orc/adapter.h" #include +#include #include #include #include @@ -33,6 +34,7 @@ #include "arrow/io/interfaces.h" #include "arrow/memory_pool.h" #include "arrow/record_batch.h" +#include "arrow/scalar.h" #include "arrow/status.h" #include "arrow/table.h" #include "arrow/table_builder.h" @@ -43,6 +45,8 @@ #include "arrow/util/key_value_metadata.h" #include "arrow/util/macros.h" #include "orc/Exceptions.hh" +#include "orc/Statistics.hh" +#include "orc/Type.hh" // alias to not interfere with nested orc namespace namespace liborc = orc; @@ -98,6 +102,114 @@ namespace { // The following is required by ORC to be uint64_t constexpr uint64_t kOrcNaturalWriteSize = 128 * 1024; +// Max millisecond value that can be safely converted to nanoseconds without +// overflowing int64_t. Beyond ~292,000 years from epoch, millis * 1,000,000 +// exceeds int64_t range. +constexpr int64_t kMaxMillisForNanos = std::numeric_limits::max() / 1000000LL; +constexpr int64_t kMinMillisForNanos = std::numeric_limits::lowest() / 1000000LL; + +// Whether a millisecond value can be converted to nanoseconds without overflowing int64_t. +bool MillisFitInNanos(int64_t millis) { + return millis >= kMinMillisForNanos && millis <= kMaxMillisForNanos; +} + +// Convert ORC column statistics to Arrow OrcColumnStatistics +// Returns a Result with the converted statistics, or an error if conversion fails +Result ConvertColumnStatistics( + const liborc::ColumnStatistics* orc_stats) { + OrcColumnStatistics stats; + + stats.has_null = orc_stats->hasNull(); + stats.num_values = static_cast(orc_stats->getNumberOfValues()); + stats.has_min_max = false; + stats.min = nullptr; + stats.max = nullptr; + + // Try to extract min/max based on the ORC column statistics type. + if (const auto* int_stats = + dynamic_cast(orc_stats)) { + if (int_stats->hasMinimum() && int_stats->hasMaximum()) { + stats.has_min_max = true; + stats.min = std::make_shared(int_stats->getMinimum()); + stats.max = std::make_shared(int_stats->getMaximum()); + } + } else if (const auto* double_stats = + dynamic_cast(orc_stats)) { + if (double_stats->hasMinimum() && double_stats->hasMaximum()) { + double min_val = double_stats->getMinimum(); + double max_val = double_stats->getMaximum(); + if (!std::isnan(min_val) && !std::isnan(max_val)) { + stats.has_min_max = true; + stats.min = std::make_shared(min_val); + stats.max = std::make_shared(max_val); + } + } + } else if (const auto* string_stats = + dynamic_cast(orc_stats)) { + if (string_stats->hasMinimum() && string_stats->hasMaximum()) { + stats.has_min_max = true; + stats.min = std::make_shared(string_stats->getMinimum()); + stats.max = std::make_shared(string_stats->getMaximum()); + } + } else if (const auto* date_stats = + dynamic_cast(orc_stats)) { + if (date_stats->hasMinimum() && date_stats->hasMaximum()) { + stats.has_min_max = true; + stats.min = std::make_shared(date_stats->getMinimum()); + stats.max = std::make_shared(date_stats->getMaximum()); + } + } else if (const auto* ts_stats = + dynamic_cast(orc_stats)) { + if (ts_stats->hasMinimum() && ts_stats->hasMaximum()) { + // ORC returns millis + sub-millisecond nanos (0-999999). + // Convert to total nanoseconds, skipping if millis would overflow. + int64_t min_millis = ts_stats->getMinimum(); + int64_t max_millis = ts_stats->getMaximum(); + if (MillisFitInNanos(min_millis) && MillisFitInNanos(max_millis)) { + stats.has_min_max = true; + auto ts_type = timestamp(TimeUnit::NANO); + stats.min = std::make_shared( + min_millis * 1000000LL + ts_stats->getMinimumNanos(), ts_type); + stats.max = std::make_shared( + max_millis * 1000000LL + ts_stats->getMaximumNanos(), ts_type); + } + } + } else if (const auto* decimal_stats = + dynamic_cast(orc_stats)) { + if (decimal_stats->hasMinimum() && + decimal_stats->hasMaximum()) { + liborc::Decimal min_dec = decimal_stats->getMinimum(); + liborc::Decimal max_dec = decimal_stats->getMaximum(); + + if (min_dec.scale != max_dec.scale) { + // Corrupted stats: scales should always match within + // a column. has_min_max remains false (conservative). + } else { + stats.has_min_max = true; + + Decimal128 min_d128(min_dec.value.getHighBits(), + min_dec.value.getLowBits()); + Decimal128 max_d128(max_dec.value.getHighBits(), + max_dec.value.getLowBits()); + + // Precision 38 is max for Decimal128; the dataset + // layer will Cast() to the actual column type. + auto dec_type = decimal128(38, min_dec.scale); + + stats.min = + std::make_shared(min_d128, + dec_type); + stats.max = + std::make_shared(max_d128, + dec_type); + } + } + } + // Other types (Boolean, Binary, Collection, etc.) don't have min/max statistics + + return stats; +} + using internal::checked_cast; class ArrowInputFile : public liborc::InputStream { @@ -204,6 +316,57 @@ Status CheckTimeZoneDatabaseAvailability() { } #endif +// Recursively build OrcSchemaField tree by walking paired ORC and Arrow types +Status BuildOrcSchemaFieldsRecursive(const liborc::Type* orc_type, + const std::shared_ptr& arrow_field, + OrcSchemaField* out_field) { + out_field->field = arrow_field; + out_field->orc_column_id = static_cast(orc_type->getColumnId()); + + // For struct types, recursively build children + if (arrow_field->type()->id() == Type::STRUCT && + orc_type->getKind() == liborc::STRUCT) { + const auto& struct_type = checked_cast(*arrow_field->type()); + size_t num_children = struct_type.num_fields(); + out_field->children.resize(num_children); + + for (size_t i = 0; i < num_children; ++i) { + const liborc::Type* orc_subtype = orc_type->getSubtype(i); + RETURN_NOT_OK(BuildOrcSchemaFieldsRecursive( + orc_subtype, struct_type.field(static_cast(i)), + &out_field->children[i])); + } + } + // For list types, handle the child + else if (arrow_field->type()->id() == Type::LIST && + orc_type->getKind() == liborc::LIST) { + const auto& list_type = checked_cast(*arrow_field->type()); + out_field->children.resize(1); + const liborc::Type* orc_subtype = orc_type->getSubtype(0); + RETURN_NOT_OK(BuildOrcSchemaFieldsRecursive(orc_subtype, list_type.value_field(), + &out_field->children[0])); + } + // For map types, handle key and value children + else if (arrow_field->type()->id() == Type::MAP && + orc_type->getKind() == liborc::MAP) { + const auto& map_type = checked_cast(*arrow_field->type()); + out_field->children.resize(2); + const liborc::Type* key_type = orc_type->getSubtype(0); + const liborc::Type* value_type = orc_type->getSubtype(1); + RETURN_NOT_OK(BuildOrcSchemaFieldsRecursive(key_type, map_type.key_field(), + &out_field->children[0])); + RETURN_NOT_OK(BuildOrcSchemaFieldsRecursive(value_type, map_type.item_field(), + &out_field->children[1])); + } + // Union types are intentionally treated as leaves. ORC stores a single set + // of column statistics for the union that aggregates across all variants, + // so a min/max from one variant is mixed with values from other variants. + // This makes per-variant predicate pushdown unsound, and the dataset layer + // won't generate filter expressions targeting union children. + + return Status::OK(); +} + } // namespace class ORCFileReader::Impl { @@ -409,6 +572,44 @@ class ORCFileReader::Impl { return ReadBatch(opts, schema, stripes_[static_cast(stripe)].num_rows); } + Result> ReadStripes( + const std::vector& stripe_indices) { + if (stripe_indices.empty()) { + ARROW_ASSIGN_OR_RAISE(auto schema, ReadSchema()); + return Table::MakeEmpty(schema); + } + + std::vector> batches; + batches.reserve(stripe_indices.size()); + for (int64_t stripe_index : stripe_indices) { + ARROW_ASSIGN_OR_RAISE(auto batch, ReadStripe(stripe_index)); + batches.push_back(std::move(batch)); + } + ARROW_ASSIGN_OR_RAISE(auto schema, ReadSchema()); + return Table::FromRecordBatches(schema, std::move(batches)); + } + + Result> ReadStripes(const std::vector& stripe_indices, + const std::vector& include_indices) { + if (stripe_indices.empty()) { + liborc::RowReaderOptions opts = DefaultRowReaderOptions(); + RETURN_NOT_OK(SelectIndices(&opts, include_indices)); + ARROW_ASSIGN_OR_RAISE(auto schema, ReadSchema(opts)); + return Table::MakeEmpty(schema); + } + + std::vector> batches; + batches.reserve(stripe_indices.size()); + for (int64_t stripe_index : stripe_indices) { + ARROW_ASSIGN_OR_RAISE(auto batch, ReadStripe(stripe_index, include_indices)); + batches.push_back(std::move(batch)); + } + liborc::RowReaderOptions opts = DefaultRowReaderOptions(); + RETURN_NOT_OK(SelectIndices(&opts, include_indices)); + ARROW_ASSIGN_OR_RAISE(auto schema, ReadSchema(opts)); + return Table::FromRecordBatches(schema, std::move(batches)); + } + Status SelectStripe(liborc::RowReaderOptions* opts, int64_t stripe) { ARROW_RETURN_IF(stripe < 0 || stripe >= NumberOfStripes(), Status::Invalid("Out of bounds stripe: ", stripe)); @@ -548,6 +749,101 @@ class ORCFileReader::Impl { return NextStripeReader(batch_size, empty_vec); } + Result GetColumnStatistics(int column_index) { + ORC_BEGIN_CATCH_NOT_OK + std::unique_ptr file_stats = reader_->getStatistics(); + if (column_index < 0 || + static_cast(column_index) >= file_stats->getNumberOfColumns()) { + return Status::Invalid("Column index ", column_index, " out of range [0, ", + file_stats->getNumberOfColumns(), ")"); + } + // NOTE: col_stats is a non-owning pointer into file_stats. + // ConvertColumnStatistics copies all values into Arrow scalars synchronously, + // so the pointer remains valid for the duration of this call. + const liborc::ColumnStatistics* col_stats = + file_stats->getColumnStatistics(static_cast(column_index)); + return ConvertColumnStatistics(col_stats); + ORC_END_CATCH_NOT_OK + } + + Result GetStripeColumnStatistics(int64_t stripe_index, + int column_index) { + ORC_BEGIN_CATCH_NOT_OK + if (stripe_index < 0 || stripe_index >= static_cast(stripes_.size())) { + return Status::Invalid("Stripe index ", stripe_index, " out of range"); + } + std::unique_ptr stripe_stats = + reader_->getStripeStatistics(static_cast(stripe_index)); + if (column_index < 0 || + static_cast(column_index) >= stripe_stats->getNumberOfColumns()) { + return Status::Invalid("Column index ", column_index, " out of range [0, ", + stripe_stats->getNumberOfColumns(), ")"); + } + // NOTE: col_stats is a non-owning pointer into stripe_stats. + // ConvertColumnStatistics copies all values into Arrow scalars synchronously, + // so the pointer remains valid for the duration of this call. + const liborc::ColumnStatistics* col_stats = + stripe_stats->getColumnStatistics(static_cast(column_index)); + return ConvertColumnStatistics(col_stats); + ORC_END_CATCH_NOT_OK + } + + Result> GetStripeStatistics( + int64_t stripe_index, const std::vector& column_indices) { + ORC_BEGIN_CATCH_NOT_OK + if (stripe_index < 0 || stripe_index >= static_cast(stripes_.size())) { + return Status::Invalid("Stripe index ", stripe_index, " out of range"); + } + std::unique_ptr stripe_stats = + reader_->getStripeStatistics(static_cast(stripe_index)); + std::vector results; + results.reserve(column_indices.size()); + for (int col_idx : column_indices) { + if (col_idx < 0 || + static_cast(col_idx) >= stripe_stats->getNumberOfColumns()) { + return Status::Invalid("Column index ", col_idx, " out of range [0, ", + stripe_stats->getNumberOfColumns(), ")"); + } + // NOTE: col_stats is a non-owning pointer into stripe_stats. + // ConvertColumnStatistics copies all values into Arrow scalars synchronously, + // so the pointer remains valid for the duration of this call. + const liborc::ColumnStatistics* col_stats = + stripe_stats->getColumnStatistics(static_cast(col_idx)); + ARROW_ASSIGN_OR_RAISE(auto converted, ConvertColumnStatistics(col_stats)); + results.push_back(std::move(converted)); + } + return results; + ORC_END_CATCH_NOT_OK + } + + Result> BuildSchemaManifest( + const std::shared_ptr& arrow_schema) { + auto manifest = std::make_shared(); + + const liborc::Type& orc_root_type = reader_->getType(); + + if (orc_root_type.getKind() != liborc::STRUCT) { + return Status::Invalid("ORC root type must be STRUCT"); + } + + int num_fields = arrow_schema->num_fields(); + if (static_cast(num_fields) != orc_root_type.getSubtypeCount()) { + return Status::Invalid("Arrow schema field count (", num_fields, + ") does not match ORC type subtype count (", + orc_root_type.getSubtypeCount(), ")"); + } + + manifest->schema_fields.resize(static_cast(num_fields)); + + for (int i = 0; i < num_fields; ++i) { + const liborc::Type* orc_subtype = orc_root_type.getSubtype(static_cast(i)); + RETURN_NOT_OK(BuildOrcSchemaFieldsRecursive( + orc_subtype, arrow_schema->field(i), &manifest->schema_fields[i])); + } + + return manifest; + } + private: MemoryPool* pool_; std::unique_ptr reader_; @@ -613,6 +909,17 @@ Result> ORCFileReader::ReadStripe( return impl_->ReadStripe(stripe, include_names); } +Result> ORCFileReader::ReadStripes( + const std::vector& stripe_indices) { + return impl_->ReadStripes(stripe_indices); +} + +Result> ORCFileReader::ReadStripes( + const std::vector& stripe_indices, + const std::vector& include_indices) { + return impl_->ReadStripes(stripe_indices, include_indices); +} + Status ORCFileReader::Seek(int64_t row_number) { return impl_->Seek(row_number); } Result> ORCFileReader::NextStripeReader( @@ -678,6 +985,25 @@ std::string ORCFileReader::GetSerializedFileTail() { return impl_->GetSerializedFileTail(); } +Result ORCFileReader::GetColumnStatistics(int column_index) { + return impl_->GetColumnStatistics(column_index); +} + +Result ORCFileReader::GetStripeColumnStatistics( + int64_t stripe_index, int column_index) { + return impl_->GetStripeColumnStatistics(stripe_index, column_index); +} + +Result> ORCFileReader::GetStripeStatistics( + int64_t stripe_index, const std::vector& column_indices) { + return impl_->GetStripeStatistics(stripe_index, column_indices); +} + +Result> ORCFileReader::BuildSchemaManifest( + const std::shared_ptr& arrow_schema) const { + return impl_->BuildSchemaManifest(arrow_schema); +} + namespace { class ArrowOutputStream : public liborc::OutputStream { diff --git a/cpp/src/arrow/adapters/orc/adapter.h b/cpp/src/arrow/adapters/orc/adapter.h index 4ffff81f355..6d0a6011f41 100644 --- a/cpp/src/arrow/adapters/orc/adapter.h +++ b/cpp/src/arrow/adapters/orc/adapter.h @@ -35,6 +35,23 @@ namespace arrow { namespace adapters { namespace orc { +// Forward declarations +struct OrcSchemaManifest; + +/// \brief Column statistics from an ORC file +struct OrcColumnStatistics { + /// \brief Whether the column contains null values + bool has_null; + /// \brief Total number of values in the column + int64_t num_values; + /// \brief Whether min/max statistics are available + bool has_min_max; + /// \brief Minimum value (nullptr if not available) + std::shared_ptr min; + /// \brief Maximum value (nullptr if not available) + std::shared_ptr max; +}; + /// \brief Information about an ORC stripe struct StripeInformation { /// \brief Offset of the stripe from the start of the file, in bytes @@ -129,6 +146,29 @@ class ARROW_EXPORT ORCFileReader { Result> ReadStripe( int64_t stripe, const std::vector& include_names); + /// \brief Read multiple selected stripes as a Table + /// + /// Reads only the specified stripes and concatenates them into a single table. + /// This is useful for stripe-selective reading based on predicate pushdown. + /// If stripe_indices is empty, returns an empty table with the file's schema. + /// + /// \param[in] stripe_indices the indices of stripes to read + /// \return the returned Table containing data from the selected stripes + Result> ReadStripes( + const std::vector& stripe_indices); + + /// \brief Read multiple selected stripes with column selection as a Table + /// + /// Reads only the specified stripes and selected columns, concatenating them + /// into a single table. + /// If stripe_indices is empty, returns an empty table with the selected schema. + /// + /// \param[in] stripe_indices the indices of stripes to read + /// \param[in] include_indices the selected field indices to read + /// \return the returned Table containing data from the selected stripes and columns + Result> ReadStripes(const std::vector& stripe_indices, + const std::vector& include_indices); + /// \brief Seek to designated row. Invoke NextStripeReader() after seek /// will return stripe reader starting from designated row. /// @@ -267,6 +307,43 @@ class ARROW_EXPORT ORCFileReader { /// \return A KeyValueMetadata object containing the ORC metadata Result> ReadMetadata(); + /// \brief Get file-level statistics for a column. + /// + /// \param[in] column_index the column index (0-based) + /// \return the column statistics + Result GetColumnStatistics(int column_index); + + /// \brief Get stripe-level statistics for a column. + /// + /// \param[in] stripe_index the stripe index (0-based) + /// \param[in] column_index the column index (0-based) + /// \return the column statistics for the specified stripe + Result GetStripeColumnStatistics(int64_t stripe_index, + int column_index); + + /// \brief Get stripe-level statistics for multiple columns at once. + /// + /// More efficient than calling GetStripeColumnStatistics() in a loop + /// because it parses the stripe's statistics object only once. + /// + /// \param[in] stripe_index the stripe index (0-based) + /// \param[in] column_indices the column indices to retrieve statistics for + /// \return vector of column statistics, one per requested column index + Result> GetStripeStatistics( + int64_t stripe_index, const std::vector& column_indices); + + /// \brief Build a schema manifest mapping Arrow fields to ORC column IDs. + /// + /// Walks the ORC type tree paired with the Arrow schema to build a mapping + /// from Arrow field paths to ORC physical column indices, which are needed + /// for statistics lookup. ORC uses depth-first pre-order numbering where + /// column 0 is the root struct. + /// + /// \param[in] arrow_schema the Arrow schema (from ReadSchema()) + /// \return the built manifest or an error + Result> BuildSchemaManifest( + const std::shared_ptr& arrow_schema) const; + private: class Impl; std::unique_ptr impl_; diff --git a/cpp/src/arrow/adapters/orc/adapter_test.cc b/cpp/src/arrow/adapters/orc/adapter_test.cc index 714e61b22b1..750c47f25e8 100644 --- a/cpp/src/arrow/adapters/orc/adapter_test.cc +++ b/cpp/src/arrow/adapters/orc/adapter_test.cc @@ -18,6 +18,7 @@ #include #include +#include #include #include @@ -36,6 +37,7 @@ #include "arrow/testing/matchers.h" #include "arrow/testing/random.h" #include "arrow/type.h" +#include "arrow/util/decimal.h" #include "arrow/util/io_util.h" #include "arrow/util/key_value_metadata.h" @@ -1179,4 +1181,914 @@ TEST_F(TestORCWriterMultipleWrite, MultipleWritesIntFieldRecordBatch) { AssertBatchWriteReadEqual(input_batches, expected_output_table, kDefaultSmallMemStreamSize * 100); } + +TEST(TestAdapterRead, GetColumnStatisticsInteger) { + MemoryOutputStream mem_stream(kDefaultMemStreamSize); + std::unique_ptr type( + liborc::Type::buildTypeFromString("struct")); + + constexpr uint64_t stripe_size = 1024; + constexpr uint64_t row_count = 1000; + + auto writer = CreateWriter(stripe_size, *type, &mem_stream); + auto batch = writer->createRowBatch(row_count); + auto struct_batch = internal::checked_cast(batch.get()); + auto int_batch = + internal::checked_cast(struct_batch->fields[0]); + auto bigint_batch = + internal::checked_cast(struct_batch->fields[1]); + + for (uint64_t i = 0; i < row_count; ++i) { + int_batch->data[i] = static_cast(i); + bigint_batch->data[i] = static_cast(i + 1000); + } + struct_batch->numElements = row_count; + int_batch->numElements = row_count; + bigint_batch->numElements = row_count; + writer->add(*batch); + writer->close(); + + std::shared_ptr in_stream(new io::BufferReader( + std::make_shared(reinterpret_cast(mem_stream.getData()), + static_cast(mem_stream.getLength())))); + + ASSERT_OK_AND_ASSIGN(auto reader, + adapters::orc::ORCFileReader::Open(in_stream, default_memory_pool())); + + ASSERT_OK_AND_ASSIGN(auto col1_stats, reader->GetColumnStatistics(1)); + EXPECT_EQ(col1_stats.num_values, row_count); + EXPECT_TRUE(col1_stats.has_min_max); + ASSERT_NE(col1_stats.min, nullptr); + ASSERT_NE(col1_stats.max, nullptr); + EXPECT_EQ(checked_pointer_cast(col1_stats.min)->value, 0); + EXPECT_EQ(checked_pointer_cast(col1_stats.max)->value, 999); + + ASSERT_OK_AND_ASSIGN(auto col2_stats, reader->GetColumnStatistics(2)); + EXPECT_EQ(col2_stats.num_values, row_count); + EXPECT_TRUE(col2_stats.has_min_max); + ASSERT_NE(col2_stats.min, nullptr); + ASSERT_NE(col2_stats.max, nullptr); + EXPECT_EQ(checked_pointer_cast(col2_stats.min)->value, 1000); + EXPECT_EQ(checked_pointer_cast(col2_stats.max)->value, 1999); +} + +TEST(TestAdapterRead, GetStripeColumnStatistics) { + MemoryOutputStream mem_stream(kDefaultMemStreamSize); + std::unique_ptr type( + liborc::Type::buildTypeFromString("struct")); + + constexpr uint64_t stripe_size = 1024; + constexpr uint64_t row_count = 500; + + auto writer = CreateWriter(stripe_size, *type, &mem_stream); + auto batch = writer->createRowBatch(row_count); + auto struct_batch = internal::checked_cast(batch.get()); + auto int_batch = + internal::checked_cast(struct_batch->fields[0]); + + for (uint64_t i = 0; i < row_count; ++i) { + int_batch->data[i] = static_cast(i + 100); + } + struct_batch->numElements = row_count; + int_batch->numElements = row_count; + writer->add(*batch); + writer->close(); + + std::shared_ptr in_stream(new io::BufferReader( + std::make_shared(reinterpret_cast(mem_stream.getData()), + static_cast(mem_stream.getLength())))); + + ASSERT_OK_AND_ASSIGN(auto reader, + adapters::orc::ORCFileReader::Open(in_stream, default_memory_pool())); + + ASSERT_OK_AND_ASSIGN(auto stripe0_stats, reader->GetStripeColumnStatistics(0, 1)); + EXPECT_TRUE(stripe0_stats.has_min_max); + EXPECT_EQ(checked_pointer_cast(stripe0_stats.min)->value, 100); + EXPECT_EQ(checked_pointer_cast(stripe0_stats.max)->value, 599); +} + +TEST(TestAdapterRead, GetColumnStatisticsString) { + MemoryOutputStream mem_stream(kDefaultMemStreamSize); + std::unique_ptr type( + liborc::Type::buildTypeFromString("struct")); + + constexpr uint64_t stripe_size = 1024; + constexpr uint64_t row_count = 5; + + auto writer = CreateWriter(stripe_size, *type, &mem_stream); + auto batch = writer->createRowBatch(row_count); + auto struct_batch = internal::checked_cast(batch.get()); + auto str_batch = + internal::checked_cast(struct_batch->fields[0]); + + std::vector strings = {"apple", "banana", "cherry", "date", "elderberry"}; + std::string data_buffer; + for (const auto& s : strings) { + data_buffer += s; + } + + size_t offset = 0; + for (size_t i = 0; i < strings.size(); ++i) { + str_batch->data[i] = const_cast(&data_buffer[offset]); + str_batch->length[i] = static_cast(strings[i].size()); + offset += strings[i].size(); + } + struct_batch->numElements = row_count; + str_batch->numElements = row_count; + writer->add(*batch); + writer->close(); + + std::shared_ptr in_stream(new io::BufferReader( + std::make_shared(reinterpret_cast(mem_stream.getData()), + static_cast(mem_stream.getLength())))); + + ASSERT_OK_AND_ASSIGN(auto reader, + adapters::orc::ORCFileReader::Open(in_stream, default_memory_pool())); + + ASSERT_OK_AND_ASSIGN(auto col_stats, reader->GetColumnStatistics(1)); + EXPECT_EQ(col_stats.num_values, row_count); + EXPECT_TRUE(col_stats.has_min_max); + ASSERT_NE(col_stats.min, nullptr); + ASSERT_NE(col_stats.max, nullptr); + EXPECT_EQ(checked_pointer_cast(col_stats.min)->ToString(), "apple"); + EXPECT_EQ(checked_pointer_cast(col_stats.max)->ToString(), "elderberry"); +} + +TEST(TestAdapterRead, GetColumnStatisticsOutOfRange) { + MemoryOutputStream mem_stream(kDefaultMemStreamSize); + std::unique_ptr type( + liborc::Type::buildTypeFromString("struct")); + + constexpr uint64_t stripe_size = 1024; + constexpr uint64_t row_count = 10; + + auto writer = CreateWriter(stripe_size, *type, &mem_stream); + auto batch = writer->createRowBatch(row_count); + auto struct_batch = internal::checked_cast(batch.get()); + auto int_batch = + internal::checked_cast(struct_batch->fields[0]); + + for (uint64_t i = 0; i < row_count; ++i) { + int_batch->data[i] = static_cast(i); + } + struct_batch->numElements = row_count; + int_batch->numElements = row_count; + writer->add(*batch); + writer->close(); + + std::shared_ptr in_stream(new io::BufferReader( + std::make_shared(reinterpret_cast(mem_stream.getData()), + static_cast(mem_stream.getLength())))); + + ASSERT_OK_AND_ASSIGN(auto reader, + adapters::orc::ORCFileReader::Open(in_stream, default_memory_pool())); + + EXPECT_THAT(reader->GetColumnStatistics(999), + Raises(StatusCode::Invalid, testing::HasSubstr("out of range"))); + + EXPECT_THAT(reader->GetStripeColumnStatistics(999, 1), + Raises(StatusCode::Invalid, testing::HasSubstr("out of range"))); +} + +TEST(TestAdapterRead, GetColumnStatisticsWithNulls) { + MemoryOutputStream mem_stream(kDefaultMemStreamSize); + std::unique_ptr type( + liborc::Type::buildTypeFromString("struct")); + + constexpr uint64_t stripe_size = 1024; + constexpr uint64_t row_count = 10; + + auto writer = CreateWriter(stripe_size, *type, &mem_stream); + auto batch = writer->createRowBatch(row_count); + auto struct_batch = internal::checked_cast(batch.get()); + auto int_batch = + internal::checked_cast(struct_batch->fields[0]); + + for (uint64_t i = 0; i < row_count; ++i) { + int_batch->data[i] = static_cast(i); + int_batch->notNull[i] = (i % 2 == 0) ? 1 : 0; + } + int_batch->hasNulls = true; + struct_batch->numElements = row_count; + int_batch->numElements = row_count; + writer->add(*batch); + writer->close(); + + std::shared_ptr in_stream(new io::BufferReader( + std::make_shared(reinterpret_cast(mem_stream.getData()), + static_cast(mem_stream.getLength())))); + + ASSERT_OK_AND_ASSIGN(auto reader, + adapters::orc::ORCFileReader::Open(in_stream, default_memory_pool())); + + ASSERT_OK_AND_ASSIGN(auto col_stats, reader->GetColumnStatistics(1)); + EXPECT_TRUE(col_stats.has_null); + EXPECT_TRUE(col_stats.has_min_max); +} + +TEST(TestAdapterRead, GetColumnStatisticsBoolean) { + MemoryOutputStream mem_stream(kDefaultMemStreamSize); + std::unique_ptr type( + liborc::Type::buildTypeFromString("struct")); + + constexpr uint64_t stripe_size = 1024; + constexpr uint64_t row_count = 100; + + auto writer = CreateWriter(stripe_size, *type, &mem_stream); + auto batch = writer->createRowBatch(row_count); + auto struct_batch = internal::checked_cast(batch.get()); + auto bool_batch = + internal::checked_cast(struct_batch->fields[0]); + + // Write 60 true values and 40 false values + for (uint64_t i = 0; i < row_count; ++i) { + bool_batch->data[i] = (i < 60) ? 1 : 0; + } + struct_batch->numElements = row_count; + bool_batch->numElements = row_count; + writer->add(*batch); + writer->close(); + + std::shared_ptr in_stream(new io::BufferReader( + std::make_shared(reinterpret_cast(mem_stream.getData()), + static_cast(mem_stream.getLength())))); + + ASSERT_OK_AND_ASSIGN(auto reader, + adapters::orc::ORCFileReader::Open(in_stream, default_memory_pool())); + + ASSERT_OK_AND_ASSIGN(auto col_stats, reader->GetColumnStatistics(1)); + EXPECT_EQ(col_stats.num_values, row_count); + EXPECT_FALSE(col_stats.has_min_max); // Boolean types don't have min/max + EXPECT_EQ(col_stats.min, nullptr); + EXPECT_EQ(col_stats.max, nullptr); +} + +TEST(TestAdapterRead, GetColumnStatisticsDate) { + MemoryOutputStream mem_stream(kDefaultMemStreamSize); + std::unique_ptr type( + liborc::Type::buildTypeFromString("struct")); + + constexpr uint64_t stripe_size = 1024; + constexpr uint64_t row_count = 10; + + auto writer = CreateWriter(stripe_size, *type, &mem_stream); + auto batch = writer->createRowBatch(row_count); + auto struct_batch = internal::checked_cast(batch.get()); + auto date_batch = + internal::checked_cast(struct_batch->fields[0]); + + // Write dates: 0 (1970-01-01) through 9 (1970-01-10) + for (uint64_t i = 0; i < row_count; ++i) { + date_batch->data[i] = static_cast(i); + } + struct_batch->numElements = row_count; + date_batch->numElements = row_count; + writer->add(*batch); + writer->close(); + + std::shared_ptr in_stream(new io::BufferReader( + std::make_shared(reinterpret_cast(mem_stream.getData()), + static_cast(mem_stream.getLength())))); + + ASSERT_OK_AND_ASSIGN(auto reader, + adapters::orc::ORCFileReader::Open(in_stream, default_memory_pool())); + + ASSERT_OK_AND_ASSIGN(auto col_stats, reader->GetColumnStatistics(1)); + EXPECT_EQ(col_stats.num_values, row_count); + EXPECT_TRUE(col_stats.has_min_max); + ASSERT_NE(col_stats.min, nullptr); + ASSERT_NE(col_stats.max, nullptr); + EXPECT_EQ(checked_pointer_cast(col_stats.min)->value, 0); + EXPECT_EQ(checked_pointer_cast(col_stats.max)->value, 9); +} + +TEST(TestAdapterRead, GetColumnStatisticsTimestamp) { + MemoryOutputStream mem_stream(kDefaultMemStreamSize); + std::unique_ptr type( + liborc::Type::buildTypeFromString("struct")); + + constexpr uint64_t stripe_size = 1024; + constexpr uint64_t row_count = 10; + + auto writer = CreateWriter(stripe_size, *type, &mem_stream); + auto batch = writer->createRowBatch(row_count); + auto struct_batch = internal::checked_cast(batch.get()); + auto ts_batch = + internal::checked_cast(struct_batch->fields[0]); + + // Write timestamps with both seconds and nanoseconds + for (uint64_t i = 0; i < row_count; ++i) { + ts_batch->data[i] = static_cast(i * 1000); // seconds + ts_batch->nanoseconds[i] = static_cast(i * 100); // nanoseconds + } + struct_batch->numElements = row_count; + ts_batch->numElements = row_count; + writer->add(*batch); + writer->close(); + + std::shared_ptr in_stream(new io::BufferReader( + std::make_shared(reinterpret_cast(mem_stream.getData()), + static_cast(mem_stream.getLength())))); + + ASSERT_OK_AND_ASSIGN(auto reader, + adapters::orc::ORCFileReader::Open(in_stream, default_memory_pool())); + + ASSERT_OK_AND_ASSIGN(auto col_stats, reader->GetColumnStatistics(1)); + EXPECT_EQ(col_stats.num_values, row_count); + EXPECT_TRUE(col_stats.has_min_max); + ASSERT_NE(col_stats.min, nullptr); + ASSERT_NE(col_stats.max, nullptr); + + // Verify the timestamps are TimestampScalar with correct nanosecond values. + // Written: data[i] = i*1000 seconds, nanoseconds[i] = i*100 sub-second nanos. + // Min (i=0): 0s + 0ns = 0 ns total. + // Max (i=9): 9000s + 900ns = 9000 * 1,000,000,000 + 900 = 9,000,000,000,900 ns. + // Statistics store millis + sub-ms-nanos (last 6 digits of ns). + // Conversion: millis * 1,000,000 + sub_ms_nanos = total nanoseconds. + auto min_ts = checked_pointer_cast(col_stats.min); + auto max_ts = checked_pointer_cast(col_stats.max); + EXPECT_TRUE(min_ts->is_valid); + EXPECT_TRUE(max_ts->is_valid); + EXPECT_EQ(min_ts->value, 0); + // 9000 seconds + 900 nanoseconds + constexpr int64_t expected_max_ns = 9000LL * 1000000000LL + 900LL; + EXPECT_EQ(max_ts->value, expected_max_ns); +} + +TEST(TestAdapterRead, ReadStripesMultiple) { + // Create a test file and read all stripes using ReadStripes + MemoryOutputStream mem_stream(kDefaultMemStreamSize); + std::unique_ptr type( + liborc::Type::buildTypeFromString("struct")); + + constexpr uint64_t stripe_size = 1024; + constexpr uint64_t row_count = 100; + + auto writer = CreateWriter(stripe_size, *type, &mem_stream); + auto batch = writer->createRowBatch(row_count); + auto struct_batch = internal::checked_cast(batch.get()); + auto int_batch = + internal::checked_cast(struct_batch->fields[0]); + auto bigint_batch = + internal::checked_cast(struct_batch->fields[1]); + + for (uint64_t i = 0; i < row_count; ++i) { + int_batch->data[i] = static_cast(i); + bigint_batch->data[i] = static_cast(i + 1000); + } + struct_batch->numElements = row_count; + int_batch->numElements = row_count; + bigint_batch->numElements = row_count; + writer->add(*batch); + writer->close(); + + std::shared_ptr in_stream(new io::BufferReader( + std::make_shared(reinterpret_cast(mem_stream.getData()), + static_cast(mem_stream.getLength())))); + + ASSERT_OK_AND_ASSIGN(auto reader, + adapters::orc::ORCFileReader::Open(in_stream, default_memory_pool())); + + int64_t num_stripes = reader->NumberOfStripes(); + ASSERT_GT(num_stripes, 0); + + // Build vector of all stripe indices + std::vector stripe_indices; + for (int64_t i = 0; i < num_stripes; ++i) { + stripe_indices.push_back(i); + } + + // Read all stripes using ReadStripes + ASSERT_OK_AND_ASSIGN(auto table, reader->ReadStripes(stripe_indices)); + + // Should have all rows and correct schema + EXPECT_EQ(table->num_rows(), row_count); + EXPECT_EQ(table->num_columns(), 2); + EXPECT_EQ(table->schema()->field(0)->name(), "col1"); + EXPECT_EQ(table->schema()->field(1)->name(), "col2"); +} + +TEST(TestAdapterRead, ReadStripesWithColumnSelection) { + // Create a test file with multiple columns + MemoryOutputStream mem_stream(kDefaultMemStreamSize); + std::unique_ptr type(liborc::Type::buildTypeFromString( + "struct")); + + constexpr uint64_t stripe_size = 1024; + constexpr uint64_t row_count = 100; + + auto writer = CreateWriter(stripe_size, *type, &mem_stream); + auto batch = writer->createRowBatch(row_count); + auto struct_batch = internal::checked_cast(batch.get()); + auto int_batch = + internal::checked_cast(struct_batch->fields[0]); + auto bigint_batch = + internal::checked_cast(struct_batch->fields[1]); + auto double_batch = + internal::checked_cast(struct_batch->fields[2]); + + for (uint64_t i = 0; i < row_count; ++i) { + int_batch->data[i] = static_cast(i); + bigint_batch->data[i] = static_cast(i + 100); + double_batch->data[i] = static_cast(i) + 0.5; + } + struct_batch->numElements = row_count; + int_batch->numElements = row_count; + bigint_batch->numElements = row_count; + double_batch->numElements = row_count; + writer->add(*batch); + writer->close(); + + std::shared_ptr in_stream(new io::BufferReader( + std::make_shared(reinterpret_cast(mem_stream.getData()), + static_cast(mem_stream.getLength())))); + + ASSERT_OK_AND_ASSIGN(auto reader, + adapters::orc::ORCFileReader::Open(in_stream, default_memory_pool())); + + int64_t num_stripes = reader->NumberOfStripes(); + ASSERT_GT(num_stripes, 0); + + // Build vector of all stripe indices + std::vector stripe_indices; + for (int64_t i = 0; i < num_stripes; ++i) { + stripe_indices.push_back(i); + } + + // Read all stripes but only columns 0 and 2 (col1 and col3) + std::vector include_indices = {0, 2}; + ASSERT_OK_AND_ASSIGN(auto table, reader->ReadStripes(stripe_indices, include_indices)); + + // Should have all rows and 2 columns + EXPECT_EQ(table->num_rows(), row_count); + EXPECT_EQ(table->num_columns(), 2); + + // Verify we got the right columns (col2 was skipped) + EXPECT_EQ(table->schema()->field(0)->name(), "col1"); + EXPECT_EQ(table->schema()->field(1)->name(), "col3"); +} + +TEST(TestAdapterRead, ReadStripesOutOfRange) { + MemoryOutputStream mem_stream(kDefaultMemStreamSize); + std::unique_ptr type( + liborc::Type::buildTypeFromString("struct")); + + constexpr uint64_t stripe_size = 512; + constexpr uint64_t row_count = 100; + + auto writer = CreateWriter(stripe_size, *type, &mem_stream); + auto batch = writer->createRowBatch(row_count); + auto struct_batch = internal::checked_cast(batch.get()); + auto int_batch = + internal::checked_cast(struct_batch->fields[0]); + + for (uint64_t i = 0; i < row_count; ++i) { + int_batch->data[i] = static_cast(i); + } + struct_batch->numElements = row_count; + int_batch->numElements = row_count; + writer->add(*batch); + writer->close(); + + std::shared_ptr in_stream(new io::BufferReader( + std::make_shared(reinterpret_cast(mem_stream.getData()), + static_cast(mem_stream.getLength())))); + + ASSERT_OK_AND_ASSIGN(auto reader, + adapters::orc::ORCFileReader::Open(in_stream, default_memory_pool())); + + int64_t num_stripes = reader->NumberOfStripes(); + ASSERT_GT(num_stripes, 0); + + // Try to read a stripe index that's out of range + std::vector stripe_indices = {0, num_stripes}; // num_stripes is out of range + EXPECT_THAT(reader->ReadStripes(stripe_indices), + Raises(StatusCode::Invalid, testing::HasSubstr("Out of bounds stripe"))); +} + +TEST(TestAdapterRead, ReadStripesEmptyVector) { + MemoryOutputStream mem_stream(kDefaultMemStreamSize); + std::unique_ptr type( + liborc::Type::buildTypeFromString("struct")); + + constexpr uint64_t stripe_size = 512; + constexpr uint64_t row_count = 100; + + auto writer = CreateWriter(stripe_size, *type, &mem_stream); + auto batch = writer->createRowBatch(row_count); + auto struct_batch = internal::checked_cast(batch.get()); + auto int_batch = + internal::checked_cast(struct_batch->fields[0]); + + for (uint64_t i = 0; i < row_count; ++i) { + int_batch->data[i] = static_cast(i); + } + struct_batch->numElements = row_count; + int_batch->numElements = row_count; + writer->add(*batch); + writer->close(); + + std::shared_ptr in_stream(new io::BufferReader( + std::make_shared(reinterpret_cast(mem_stream.getData()), + static_cast(mem_stream.getLength())))); + + ASSERT_OK_AND_ASSIGN(auto reader, + adapters::orc::ORCFileReader::Open(in_stream, default_memory_pool())); + + // Empty stripe indices should return an empty table with the file's schema + std::vector stripe_indices; + ASSERT_OK_AND_ASSIGN(auto table, reader->ReadStripes(stripe_indices)); + EXPECT_EQ(table->num_rows(), 0); + EXPECT_EQ(table->num_columns(), 1); + EXPECT_EQ(table->schema()->field(0)->name(), "col1"); +} + +TEST(TestAdapterRead, BuildSchemaManifest) { + MemoryOutputStream mem_stream(kDefaultMemStreamSize); + std::unique_ptr type(liborc::Type::buildTypeFromString( + "struct")); + + constexpr uint64_t stripe_size = 1024; + constexpr uint64_t row_count = 10; + + auto writer = CreateWriter(stripe_size, *type, &mem_stream); + auto batch = writer->createRowBatch(row_count); + auto struct_batch = internal::checked_cast(batch.get()); + + struct_batch->numElements = row_count; + for (size_t i = 0; i < struct_batch->fields.size(); ++i) { + struct_batch->fields[i]->numElements = row_count; + } + writer->add(*batch); + writer->close(); + + std::shared_ptr in_stream(new io::BufferReader( + std::make_shared(reinterpret_cast(mem_stream.getData()), + static_cast(mem_stream.getLength())))); + + ASSERT_OK_AND_ASSIGN(auto reader, + adapters::orc::ORCFileReader::Open(in_stream, default_memory_pool())); + ASSERT_OK_AND_ASSIGN(auto arrow_schema, reader->ReadSchema()); + + // Build the manifest + ASSERT_OK_AND_ASSIGN(auto manifest, reader->BuildSchemaManifest(arrow_schema)); + + // Verify manifest field count matches Arrow schema field count + EXPECT_EQ(manifest->schema_fields.size(), arrow_schema->num_fields()); + EXPECT_EQ(manifest->schema_fields.size(), 3); + + // Field 0 ("col1") should map to ORC column 1 + EXPECT_EQ(manifest->schema_fields[0].field->name(), "col1"); + EXPECT_EQ(manifest->schema_fields[0].orc_column_id, 1); + EXPECT_TRUE(manifest->schema_fields[0].is_leaf()); + + // Field 1 ("col2") should map to ORC column 2 + EXPECT_EQ(manifest->schema_fields[1].field->name(), "col2"); + EXPECT_EQ(manifest->schema_fields[1].orc_column_id, 2); + EXPECT_TRUE(manifest->schema_fields[1].is_leaf()); + + // Field 2 ("col3") should map to ORC column 3 + EXPECT_EQ(manifest->schema_fields[2].field->name(), "col3"); + EXPECT_EQ(manifest->schema_fields[2].orc_column_id, 3); + EXPECT_TRUE(manifest->schema_fields[2].is_leaf()); +} + +TEST(TestAdapterRead, GetColumnStatisticsDoubleNaN) { + MemoryOutputStream mem_stream(kDefaultMemStreamSize); + std::unique_ptr type( + liborc::Type::buildTypeFromString("struct")); + + constexpr uint64_t stripe_size = 1024; + constexpr uint64_t row_count = 10; + + auto writer = CreateWriter(stripe_size, *type, &mem_stream); + auto batch = writer->createRowBatch(row_count); + auto struct_batch = internal::checked_cast(batch.get()); + auto double_batch = + internal::checked_cast(struct_batch->fields[0]); + + // Write some normal values and some NaN values + for (uint64_t i = 0; i < row_count; ++i) { + if (i % 3 == 0) { + double_batch->data[i] = std::numeric_limits::quiet_NaN(); + } else { + double_batch->data[i] = static_cast(i); + } + } + struct_batch->numElements = row_count; + double_batch->numElements = row_count; + writer->add(*batch); + writer->close(); + + std::shared_ptr in_stream(new io::BufferReader( + std::make_shared(reinterpret_cast(mem_stream.getData()), + static_cast(mem_stream.getLength())))); + + ASSERT_OK_AND_ASSIGN(auto reader, + adapters::orc::ORCFileReader::Open(in_stream, default_memory_pool())); + + ASSERT_OK_AND_ASSIGN(auto col_stats, reader->GetColumnStatistics(1)); + // When NaN values are present, ORC may report NaN as min or max. + // Our guard should detect this and set has_min_max = false. + if (col_stats.has_min_max) { + // If ORC writer filtered NaN from statistics, min/max should be valid (non-NaN) + auto min_val = checked_pointer_cast(col_stats.min); + auto max_val = checked_pointer_cast(col_stats.max); + EXPECT_FALSE(std::isnan(min_val->value)); + EXPECT_FALSE(std::isnan(max_val->value)); + } + // Either has_min_max is false (NaN guard triggered), or min/max are non-NaN. + // Both outcomes are correct. +} + +TEST(TestAdapterRead, GetColumnStatisticsNegativeIndex) { + MemoryOutputStream mem_stream(kDefaultMemStreamSize); + std::unique_ptr type( + liborc::Type::buildTypeFromString("struct")); + + constexpr uint64_t stripe_size = 1024; + constexpr uint64_t row_count = 10; + + auto writer = CreateWriter(stripe_size, *type, &mem_stream); + auto batch = writer->createRowBatch(row_count); + auto struct_batch = internal::checked_cast(batch.get()); + auto int_batch = + internal::checked_cast(struct_batch->fields[0]); + + for (uint64_t i = 0; i < row_count; ++i) { + int_batch->data[i] = static_cast(i); + } + struct_batch->numElements = row_count; + int_batch->numElements = row_count; + writer->add(*batch); + writer->close(); + + std::shared_ptr in_stream(new io::BufferReader( + std::make_shared(reinterpret_cast(mem_stream.getData()), + static_cast(mem_stream.getLength())))); + + ASSERT_OK_AND_ASSIGN(auto reader, + adapters::orc::ORCFileReader::Open(in_stream, default_memory_pool())); + + // Negative column index should return Invalid + EXPECT_THAT(reader->GetColumnStatistics(-1), + Raises(StatusCode::Invalid, testing::HasSubstr("out of range"))); + + // Negative column index in stripe stats should also return Invalid + EXPECT_THAT(reader->GetStripeColumnStatistics(0, -1), + Raises(StatusCode::Invalid, testing::HasSubstr("out of range"))); +} + +TEST(TestAdapterRead, GetStripeStatisticsBulk) { + MemoryOutputStream mem_stream(kDefaultMemStreamSize); + std::unique_ptr type(liborc::Type::buildTypeFromString( + "struct")); + + constexpr uint64_t stripe_size = 1024; + constexpr uint64_t row_count = 100; + + auto writer = CreateWriter(stripe_size, *type, &mem_stream); + auto batch = writer->createRowBatch(row_count); + auto struct_batch = internal::checked_cast(batch.get()); + auto int_batch = + internal::checked_cast(struct_batch->fields[0]); + auto bigint_batch = + internal::checked_cast(struct_batch->fields[1]); + auto double_batch = + internal::checked_cast(struct_batch->fields[2]); + + for (uint64_t i = 0; i < row_count; ++i) { + int_batch->data[i] = static_cast(i); + bigint_batch->data[i] = static_cast(i + 1000); + double_batch->data[i] = static_cast(i) + 0.5; + } + struct_batch->numElements = row_count; + int_batch->numElements = row_count; + bigint_batch->numElements = row_count; + double_batch->numElements = row_count; + writer->add(*batch); + writer->close(); + + std::shared_ptr in_stream(new io::BufferReader( + std::make_shared(reinterpret_cast(mem_stream.getData()), + static_cast(mem_stream.getLength())))); + + ASSERT_OK_AND_ASSIGN(auto reader, + adapters::orc::ORCFileReader::Open(in_stream, default_memory_pool())); + + // ORC column indices: 0=root struct, 1=col1, 2=col2, 3=col3 + std::vector column_indices = {1, 2, 3}; + + // Get bulk stats for stripe 0 + ASSERT_OK_AND_ASSIGN(auto bulk_stats, reader->GetStripeStatistics(0, column_indices)); + ASSERT_EQ(bulk_stats.size(), 3); + + // Verify bulk results match individual GetStripeColumnStatistics calls + for (size_t i = 0; i < column_indices.size(); ++i) { + ASSERT_OK_AND_ASSIGN(auto individual_stats, + reader->GetStripeColumnStatistics(0, column_indices[i])); + EXPECT_EQ(bulk_stats[i].has_null, individual_stats.has_null); + EXPECT_EQ(bulk_stats[i].num_values, individual_stats.num_values); + EXPECT_EQ(bulk_stats[i].has_min_max, individual_stats.has_min_max); + if (bulk_stats[i].has_min_max && individual_stats.has_min_max) { + EXPECT_TRUE(bulk_stats[i].min->Equals(*individual_stats.min)); + EXPECT_TRUE(bulk_stats[i].max->Equals(*individual_stats.max)); + } + } + + // Verify out-of-range column index in bulk API + std::vector bad_indices = {1, 999}; + EXPECT_THAT(reader->GetStripeStatistics(0, bad_indices), + Raises(StatusCode::Invalid, testing::HasSubstr("out of range"))); + + // Verify out-of-range stripe index in bulk API + EXPECT_THAT(reader->GetStripeStatistics(999, column_indices), + Raises(StatusCode::Invalid, testing::HasSubstr("out of range"))); +} + +TEST(TestAdapterRead, BuildSchemaManifestNested) { + // ORC type with nested struct and list. + // ORC column IDs (depth-first pre-order): + // 0: root struct + // 1: col1 (int) + // 2: col2 (struct) + // 3: col2.a (string) + // 4: col2.b (bigint) + // 5: col3 (array/list) + // 6: col3._elem (int) + MemoryOutputStream mem_stream(kDefaultMemStreamSize); + std::unique_ptr type(liborc::Type::buildTypeFromString( + "struct,col3:array>")); + + constexpr uint64_t stripe_size = 1024; + constexpr uint64_t row_count = 1; + + auto writer = CreateWriter(stripe_size, *type, &mem_stream); + auto batch = writer->createRowBatch(row_count); + auto struct_batch = internal::checked_cast(batch.get()); + + // Set up col1 (int) + auto int_batch = + internal::checked_cast(struct_batch->fields[0]); + int_batch->data[0] = 42; + int_batch->numElements = row_count; + + // Set up col2 (struct) + auto inner_struct = + internal::checked_cast(struct_batch->fields[1]); + auto str_batch = + internal::checked_cast(inner_struct->fields[0]); + auto bigint_batch = + internal::checked_cast(inner_struct->fields[1]); + std::string str_data = "hello"; + str_batch->data[0] = const_cast(str_data.c_str()); + str_batch->length[0] = static_cast(str_data.size()); + str_batch->numElements = row_count; + bigint_batch->data[0] = 100; + bigint_batch->numElements = row_count; + inner_struct->numElements = row_count; + + // Set up col3 (array) - write one list with one element + auto list_batch = + internal::checked_cast(struct_batch->fields[2]); + auto list_elem_batch = + internal::checked_cast(list_batch->elements.get()); + list_batch->offsets[0] = 0; + list_batch->offsets[1] = 1; + list_elem_batch->data[0] = 7; + list_elem_batch->numElements = 1; + list_batch->numElements = row_count; + + struct_batch->numElements = row_count; + writer->add(*batch); + writer->close(); + + std::shared_ptr in_stream(new io::BufferReader( + std::make_shared(reinterpret_cast(mem_stream.getData()), + static_cast(mem_stream.getLength())))); + + ASSERT_OK_AND_ASSIGN(auto reader, + adapters::orc::ORCFileReader::Open(in_stream, default_memory_pool())); + ASSERT_OK_AND_ASSIGN(auto arrow_schema, reader->ReadSchema()); + + ASSERT_OK_AND_ASSIGN(auto manifest, reader->BuildSchemaManifest(arrow_schema)); + ASSERT_EQ(manifest->schema_fields.size(), 3); + + // col1: leaf, orc_column_id=1 + EXPECT_EQ(manifest->schema_fields[0].field->name(), "col1"); + EXPECT_EQ(manifest->schema_fields[0].orc_column_id, 1); + EXPECT_TRUE(manifest->schema_fields[0].is_leaf()); + + // col2: struct with 2 children, orc_column_id=2 + EXPECT_EQ(manifest->schema_fields[1].field->name(), "col2"); + EXPECT_EQ(manifest->schema_fields[1].orc_column_id, 2); + EXPECT_FALSE(manifest->schema_fields[1].is_leaf()); + ASSERT_EQ(manifest->schema_fields[1].children.size(), 2); + EXPECT_EQ(manifest->schema_fields[1].children[0].field->name(), "a"); + EXPECT_EQ(manifest->schema_fields[1].children[0].orc_column_id, 3); + EXPECT_TRUE(manifest->schema_fields[1].children[0].is_leaf()); + EXPECT_EQ(manifest->schema_fields[1].children[1].field->name(), "b"); + EXPECT_EQ(manifest->schema_fields[1].children[1].orc_column_id, 4); + EXPECT_TRUE(manifest->schema_fields[1].children[1].is_leaf()); + + // col3: list with 1 child element, orc_column_id=5 + EXPECT_EQ(manifest->schema_fields[2].field->name(), "col3"); + EXPECT_EQ(manifest->schema_fields[2].orc_column_id, 5); + EXPECT_FALSE(manifest->schema_fields[2].is_leaf()); + ASSERT_EQ(manifest->schema_fields[2].children.size(), 1); + EXPECT_EQ(manifest->schema_fields[2].children[0].orc_column_id, 6); + EXPECT_TRUE(manifest->schema_fields[2].children[0].is_leaf()); + + // Test GetField() with various paths + EXPECT_EQ(manifest->GetField({}), nullptr); // empty path + EXPECT_EQ(manifest->GetField({99}), nullptr); // out of range + + // GetField({0}) -> col1 + const auto* f0 = manifest->GetField({0}); + ASSERT_NE(f0, nullptr); + EXPECT_EQ(f0->field->name(), "col1"); + EXPECT_EQ(f0->orc_column_id, 1); + + // GetField({1}) -> col2 (struct) + const auto* f1 = manifest->GetField({1}); + ASSERT_NE(f1, nullptr); + EXPECT_EQ(f1->field->name(), "col2"); + EXPECT_EQ(f1->orc_column_id, 2); + + // GetField({1, 0}) -> col2.a + const auto* f1_0 = manifest->GetField({1, 0}); + ASSERT_NE(f1_0, nullptr); + EXPECT_EQ(f1_0->field->name(), "a"); + EXPECT_EQ(f1_0->orc_column_id, 3); + + // GetField({1, 1}) -> col2.b + const auto* f1_1 = manifest->GetField({1, 1}); + ASSERT_NE(f1_1, nullptr); + EXPECT_EQ(f1_1->field->name(), "b"); + EXPECT_EQ(f1_1->orc_column_id, 4); + + // GetField({2, 0}) -> col3 element + const auto* f2_0 = manifest->GetField({2, 0}); + ASSERT_NE(f2_0, nullptr); + EXPECT_EQ(f2_0->orc_column_id, 6); + + // Out of range nested paths + EXPECT_EQ(manifest->GetField({1, 99}), nullptr); + EXPECT_EQ(manifest->GetField({0, 0}), nullptr); // col1 is leaf, no children +} + +TEST(TestAdapterRead, GetColumnStatisticsDecimal) { + MemoryOutputStream mem_stream(kDefaultMemStreamSize); + // decimal(10,2) uses Decimal64VectorBatch internally (precision <= 18) + std::unique_ptr type( + liborc::Type::buildTypeFromString("struct")); + + constexpr uint64_t stripe_size = 1024; + constexpr uint64_t row_count = 5; + + auto writer = CreateWriter(stripe_size, *type, &mem_stream); + auto batch = writer->createRowBatch(row_count); + auto struct_batch = internal::checked_cast(batch.get()); + auto dec_batch = + internal::checked_cast(struct_batch->fields[0]); + + // Write unscaled values with non-zero trailing digits to avoid ORC + // normalizing away trailing zeros in statistics (e.g. 1.00 → 1 scale 0). + // Values: 101, 203, 305, 407, 509 with scale=2 + // Represent: 1.01, 2.03, 3.05, 4.07, 5.09 + dec_batch->values[0] = 101; + dec_batch->values[1] = 203; + dec_batch->values[2] = 305; + dec_batch->values[3] = 407; + dec_batch->values[4] = 509; + struct_batch->numElements = row_count; + dec_batch->numElements = row_count; + writer->add(*batch); + writer->close(); + + std::shared_ptr in_stream(new io::BufferReader( + std::make_shared(reinterpret_cast(mem_stream.getData()), + static_cast(mem_stream.getLength())))); + + ASSERT_OK_AND_ASSIGN(auto reader, + adapters::orc::ORCFileReader::Open(in_stream, default_memory_pool())); + + ASSERT_OK_AND_ASSIGN(auto col_stats, reader->GetColumnStatistics(1)); + EXPECT_EQ(col_stats.num_values, row_count); + EXPECT_FALSE(col_stats.has_null); + EXPECT_TRUE(col_stats.has_min_max); + ASSERT_NE(col_stats.min, nullptr); + ASSERT_NE(col_stats.max, nullptr); + + // Verify Decimal128Scalar values (unscaled integers with scale 2) + auto min_dec = checked_pointer_cast(col_stats.min); + auto max_dec = checked_pointer_cast(col_stats.max); + EXPECT_EQ(min_dec->value, Decimal128(101)); // 1.01 unscaled + EXPECT_EQ(max_dec->value, Decimal128(509)); // 5.09 unscaled + + // Verify the type has precision 38 and scale 2 (as set by ConvertColumnStatistics) + EXPECT_TRUE(min_dec->type->Equals(decimal128(38, 2))); + EXPECT_TRUE(max_dec->type->Equals(decimal128(38, 2))); +} + } // namespace arrow diff --git a/cpp/src/arrow/adapters/orc/util.cc b/cpp/src/arrow/adapters/orc/util.cc index 6974faae59b..2b786096931 100644 --- a/cpp/src/arrow/adapters/orc/util.cc +++ b/cpp/src/arrow/adapters/orc/util.cc @@ -1229,6 +1229,30 @@ int GetOrcMajorVersion() { return std::stoi(major_version); } +const OrcSchemaField* OrcSchemaManifest::GetField(const std::vector& path) const { + if (path.empty()) { + return nullptr; + } + + // Start with top-level field + if (path[0] < 0 || static_cast(path[0]) >= schema_fields.size()) { + return nullptr; + } + + const OrcSchemaField* current = &schema_fields[path[0]]; + + // Traverse nested path + for (size_t i = 1; i < path.size(); ++i) { + int child_index = path[i]; + if (child_index < 0 || static_cast(child_index) >= current->children.size()) { + return nullptr; + } + current = ¤t->children[child_index]; + } + + return current; +} + } // namespace orc } // namespace adapters } // namespace arrow diff --git a/cpp/src/arrow/adapters/orc/util.h b/cpp/src/arrow/adapters/orc/util.h index a18b11dda01..f6159420e74 100644 --- a/cpp/src/arrow/adapters/orc/util.h +++ b/cpp/src/arrow/adapters/orc/util.h @@ -32,6 +32,39 @@ namespace arrow { namespace adapters { namespace orc { +/// \brief Bridge between an Arrow Field and an ORC column index. +/// +/// Represents a field in the Arrow schema mapped to its corresponding ORC column ID +/// for statistics lookup. ORC uses depth-first pre-order numbering where column 0 +/// is the root struct. +struct ARROW_EXPORT OrcSchemaField { + /// Arrow field definition + std::shared_ptr field; + /// ORC physical column ID (for GetColumnStatistics lookup) + int orc_column_id; + /// Child fields for nested types (struct, list, map) + std::vector children; + /// Returns true if this is a leaf field (no children) + bool is_leaf() const { return children.empty(); } +}; + +/// \brief Bridge between Arrow schema and ORC schema. +/// +/// Maps Arrow field paths to ORC physical column indices for statistics lookup. +struct ARROW_EXPORT OrcSchemaManifest { + /// Top-level fields (parallel to Arrow schema fields) + std::vector schema_fields; + + /// \brief Find the OrcSchemaField for a given Arrow field path. + /// + /// Path is a sequence of child indices starting from the root. + /// For example, path {1, 0} means schema_fields[1].children[0]. + /// + /// \param[in] path sequence of child indices from root to target field + /// \return pointer to the field if found, nullptr otherwise + const OrcSchemaField* GetField(const std::vector& path) const; +}; + Result> GetArrowType(const liborc::Type* type); Result> GetOrcType(const Schema& schema);