Skip to content

Commit 82192ee

Browse files
authored
Merge pull request #73 from ClickHouse/apply-clickhouse-patches-to-release-23.0.0
Apply clickhouse patches to release 23.0.0
2 parents eafe3a9 + b7fa91b commit 82192ee

12 files changed

Lines changed: 100 additions & 19 deletions

File tree

cpp/src/arrow/adapters/orc/adapter.cc

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -223,6 +223,10 @@ class ORCFileReader::Impl {
223223
return Init();
224224
}
225225

226+
virtual liborc::Reader* GetRawORCReader() {
227+
return reader_.get();
228+
}
229+
226230
Status Init() {
227231
int64_t nstripes = reader_->getNumberOfStripes();
228232
stripes_.resize(static_cast<size_t>(nstripes));
@@ -569,6 +573,15 @@ Result<std::unique_ptr<ORCFileReader>> ORCFileReader::Open(
569573
return result;
570574
}
571575

576+
Status ORCFileReader::Open(const std::shared_ptr<io::RandomAccessFile>& file,
577+
MemoryPool* pool, std::unique_ptr<ORCFileReader>* reader) {
578+
return Open(file, pool).Value(reader);
579+
}
580+
581+
liborc::Reader* ORCFileReader::GetRawORCReader() {
582+
return impl_->GetRawORCReader();
583+
}
584+
572585
Result<std::shared_ptr<const KeyValueMetadata>> ORCFileReader::ReadMetadata() {
573586
return impl_->ReadMetadata();
574587
}

cpp/src/arrow/adapters/orc/adapter.h

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
#include <vector>
2323

2424
#include "arrow/adapters/orc/options.h"
25+
#include "arrow/adapters/orc/util.h"
2526
#include "arrow/io/interfaces.h"
2627
#include "arrow/memory_pool.h"
2728
#include "arrow/record_batch.h"
@@ -61,6 +62,19 @@ class ARROW_EXPORT ORCFileReader {
6162
static Result<std::unique_ptr<ORCFileReader>> Open(
6263
const std::shared_ptr<io::RandomAccessFile>& file, MemoryPool* pool);
6364

65+
/// \brief Creates a new ORC reader.
66+
///
67+
/// \param[in] file the data source
68+
/// \param[in] pool a MemoryPool to use for buffer allocations
69+
/// \param[out] reader the returned reader object
70+
/// \return Status
71+
ARROW_DEPRECATED("Deprecated in 6.0.0. Use Result-returning overload instead.")
72+
static Status Open(const std::shared_ptr<io::RandomAccessFile>& file, MemoryPool* pool,
73+
std::unique_ptr<ORCFileReader>* reader);
74+
75+
/// \brief Get ORC reader from inside.
76+
liborc::Reader* GetRawORCReader();
77+
6478
/// \brief Return the schema read from the ORC file
6579
///
6680
/// \return the returned Schema object

cpp/src/arrow/ipc/reader.cc

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1384,6 +1384,23 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader {
13841384
return total;
13851385
}
13861386

1387+
Result<int64_t> RecordBatchCountRows(int i) override {
1388+
DCHECK_GE(i, 0);
1389+
DCHECK_LT(i, num_record_batches());
1390+
ARROW_ASSIGN_OR_RAISE(auto outer_message,
1391+
ReadMessageFromBlock(GetRecordBatchBlock(i)));
1392+
auto metadata = outer_message->metadata();
1393+
const flatbuf::Message* message = nullptr;
1394+
RETURN_NOT_OK(
1395+
internal::VerifyMessage(metadata->data(), metadata->size(), &message));
1396+
auto batch = message->header_as_RecordBatch();
1397+
if (batch == nullptr) {
1398+
return Status::IOError(
1399+
"Header-type of flatbuffer-encoded Message is not RecordBatch.");
1400+
}
1401+
return batch->length();
1402+
}
1403+
13871404
Status Open(const std::shared_ptr<io::RandomAccessFile>& file, int64_t footer_offset,
13881405
const IpcReadOptions& options) {
13891406
owned_file_ = file;

cpp/src/arrow/ipc/reader.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,8 @@ class ARROW_EXPORT RecordBatchFileReader
203203
/// \brief Computes the total number of rows in the file.
204204
virtual Result<int64_t> CountRows() = 0;
205205

206+
virtual Result<int64_t> RecordBatchCountRows(int i) = 0;
207+
206208
/// \brief Begin loading metadata for the desired batches into memory.
207209
///
208210
/// This method will also begin loading all dictionaries messages into memory.

cpp/src/arrow/table.cc

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -654,7 +654,6 @@ TableBatchReader::TableBatchReader(const Table& table)
654654
for (int i = 0; i < table.num_columns(); ++i) {
655655
column_data_[i] = table.column(i).get();
656656
}
657-
DCHECK(table_.Validate().ok());
658657
}
659658

660659
TableBatchReader::TableBatchReader(std::shared_ptr<Table> table)
@@ -668,7 +667,6 @@ TableBatchReader::TableBatchReader(std::shared_ptr<Table> table)
668667
for (int i = 0; i < owned_table_->num_columns(); ++i) {
669668
column_data_[i] = owned_table_->column(i).get();
670669
}
671-
DCHECK(table_.Validate().ok());
672670
}
673671

674672
std::shared_ptr<Schema> TableBatchReader::schema() const { return table_.schema(); }

cpp/src/arrow/type_fwd.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ using BufferVector = std::vector<std::shared_ptr<Buffer>>;
6060

6161
class DataType;
6262
class Field;
63-
class FieldRef;
63+
class FieldRef; /// NOLINT(bugprone-forward-declaration-namespace)
6464
class KeyValueMetadata;
6565
enum class Endianness;
6666
class Schema;

cpp/src/arrow/util/logging.cc

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
#endif
2525
#include <cstdlib>
2626
#include <iostream>
27+
#include <sstream>
2728

2829
#ifdef ARROW_USE_GLOG
2930

@@ -65,33 +66,33 @@ class CerrLog {
6566
public:
6667
explicit CerrLog(ArrowLogLevel severity) : severity_(severity), has_logged_(false) {}
6768

68-
virtual ~CerrLog() {
69+
virtual ~CerrLog() noexcept(false) {
6970
if (has_logged_) {
70-
std::cerr << std::endl;
71+
stream << std::endl;
7172
}
7273
if (severity_ == ArrowLogLevel::ARROW_FATAL) {
73-
PrintBackTrace();
74-
std::abort();
74+
throw std::runtime_error(stream.str());
7575
}
7676
}
7777

7878
std::ostream& Stream() {
7979
has_logged_ = true;
80-
return std::cerr;
80+
return stream;
8181
}
8282

8383
template <class T>
8484
CerrLog& operator<<(const T& t) {
8585
if (severity_ != ArrowLogLevel::ARROW_DEBUG) {
8686
has_logged_ = true;
87-
std::cerr << t;
87+
stream << t;
8888
}
8989
return *this;
9090
}
9191

9292
protected:
9393
const ArrowLogLevel severity_;
9494
bool has_logged_;
95+
std::stringstream stream;
9596

9697
void PrintBackTrace() {
9798
#ifdef ARROW_WITH_BACKTRACE
@@ -250,7 +251,7 @@ std::ostream& ArrowLog::Stream() {
250251

251252
bool ArrowLog::IsEnabled() const { return is_enabled_; }
252253

253-
ArrowLog::~ArrowLog() {
254+
ArrowLog::~ArrowLog() noexcept(false) {
254255
if (logging_provider_ != nullptr) {
255256
delete reinterpret_cast<LoggingProvider*>(logging_provider_);
256257
logging_provider_ = nullptr;

cpp/src/arrow/util/logging.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,7 @@ enum class ArrowLogLevel : int {
149149
// This is also a null log which does not output anything.
150150
class ARROW_EXPORT ArrowLogBase {
151151
public:
152-
virtual ~ArrowLogBase() {}
152+
virtual ~ArrowLogBase() noexcept(false) {}
153153

154154
virtual bool IsEnabled() const { return false; }
155155

@@ -168,7 +168,7 @@ class ARROW_EXPORT ArrowLogBase {
168168
class ARROW_EXPORT ArrowLog : public ArrowLogBase {
169169
public:
170170
ArrowLog(const char* file_name, int line_number, ArrowLogLevel severity);
171-
~ArrowLog() override;
171+
~ArrowLog() noexcept(false) override;
172172

173173
/// Return whether or not current logging instance is enabled.
174174
///

cpp/src/arrow/util/mutex.cc

Lines changed: 28 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919

2020
#include <mutex>
2121

22-
#ifndef _WIN32
22+
#if !defined( _WIN32) && !defined(__ppc64__)
2323
# include <pthread.h>
2424
# include <atomic>
2525
#endif
@@ -59,7 +59,7 @@ Mutex::Guard Mutex::Lock() {
5959

6060
Mutex::Mutex() : impl_(new Impl, [](Impl* impl) { delete impl; }) {}
6161

62-
#ifndef _WIN32
62+
#if !defined( _WIN32) && !defined(__ppc64__)
6363
namespace {
6464

6565
struct AfterForkState {
@@ -71,19 +71,42 @@ struct AfterForkState {
7171
// The leak (only in child processes) is a small price to pay for robustness.
7272
Mutex* mutex = nullptr;
7373

74+
enum State {
75+
INITIALIZED,
76+
IN_PROCESS,
77+
NOT_INITIALIZED,
78+
};
79+
80+
std::atomic_int state = INITIALIZED;
81+
7482
private:
7583
AfterForkState() {
7684
pthread_atfork(/*prepare=*/nullptr, /*parent=*/nullptr, /*child=*/&AfterFork);
7785
}
7886

79-
static void AfterFork() { instance.mutex = new Mutex; }
87+
static void AfterFork() { instance.state.store(NOT_INITIALIZED); }
88+
8089
};
8190

8291
AfterForkState AfterForkState::instance;
8392
} // namespace
8493

85-
Mutex* GlobalForkSafeMutex() { return AfterForkState::instance.mutex; }
86-
#endif // _WIN32
94+
Mutex* GlobalForkSafeMutex() {
95+
if (AfterForkState::instance.state.load() == AfterForkState::State::INITIALIZED) {
96+
return AfterForkState::instance.mutex;
97+
}
98+
99+
int expected = AfterForkState::State::NOT_INITIALIZED;
100+
if (AfterForkState::instance.state.compare_exchange_strong(expected, AfterForkState::State::IN_PROCESS)) {
101+
AfterForkState::instance.mutex = new Mutex;
102+
AfterForkState::instance.state.store(AfterForkState::State::INITIALIZED);
103+
} else {
104+
while (AfterForkState::instance.state.load() != AfterForkState::State::INITIALIZED);
105+
}
106+
107+
return AfterForkState::instance.mutex;
108+
}
109+
#endif // _WIN32 and __ppc64__
87110

88111
} // namespace util
89112
} // namespace arrow

cpp/src/arrow/util/mutex.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ class ARROW_EXPORT Mutex {
6060
std::unique_ptr<Impl, void (*)(Impl*)> impl_;
6161
};
6262

63-
#ifndef _WIN32
63+
#if !defined(_WIN32) && !defined(__ppc64__)
6464
/// Return a pointer to a process-wide, process-specific Mutex that can be used
6565
/// at any point in a child process. NULL is returned when called in the parent.
6666
///

0 commit comments

Comments
 (0)