-
Notifications
You must be signed in to change notification settings - Fork 3.8k
Expand file tree
/
Copy pathresult_sink_operator.h
More file actions
188 lines (162 loc) · 7.25 KB
/
result_sink_operator.h
File metadata and controls
188 lines (162 loc) · 7.25 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <gen_cpp/PlanNodes_types.h>
#include <stdint.h>
#include "exec/operator/operator.h"
#include "exec/sink/writer/result_writer.h"
#include "runtime/result_block_buffer.h"
namespace doris {
#include "common/compile_check_begin.h"
class ResultBlockBufferBase;
struct ResultFileOptions {
// [[deprecated]]
bool is_local_file;
std::string file_path;
TFileFormatType::type file_format;
std::string column_separator;
std::string line_delimiter;
size_t max_file_size_bytes = 1 * 1024 * 1024 * 1024; // 1GB
std::vector<TNetworkAddress> broker_addresses;
std::map<std::string, std::string> broker_properties;
std::string success_file_name;
std::vector<std::vector<std::string>> schema; //not use in outfile with parquet format
std::map<std::string, std::string> file_properties; //not use in outfile with parquet format
std::vector<TParquetSchema> parquet_schemas;
TParquetCompressionType::type parquet_commpression_type;
TParquetVersion::type parquet_version;
bool parquert_disable_dictionary = false;
bool enable_int96_timestamps = false;
//note: use outfile with parquet format, have deprecated 9:schema and 10:file_properties
//But in order to consider the compatibility when upgrading, so add a bool to check
//Now the code version is 1.1.2, so when the version is after 1.2, could remove this code.
bool is_refactor_before_flag = false;
std::string orc_schema;
TFileCompressType::type orc_compression_type;
// currently only for csv
// TODO: we should merge parquet_commpression_type/orc_compression_type/compression_type
TFileCompressType::type compression_type = TFileCompressType::PLAIN;
// Deprecated compatibility flag. New FE handles outfile delete_existing_files in FE
// and clears this field before sending the result sink to BE. Keep reading it here
// only for compatibility with older FE during rolling upgrade.
bool delete_existing_files = false;
std::string file_suffix;
//Bring BOM when exporting to CSV format
bool with_bom = false;
int64_t orc_writer_version = 0;
ResultFileOptions(const TResultFileSinkOptions& t_opt) {
file_path = t_opt.file_path;
file_format = t_opt.file_format;
column_separator = t_opt.__isset.column_separator ? t_opt.column_separator : "\t";
line_delimiter = t_opt.__isset.line_delimiter ? t_opt.line_delimiter : "\n";
max_file_size_bytes =
t_opt.__isset.max_file_size_bytes ? t_opt.max_file_size_bytes : max_file_size_bytes;
// Deprecated compatibility path. New FE should already have cleared this flag.
delete_existing_files =
t_opt.__isset.delete_existing_files ? t_opt.delete_existing_files : false;
file_suffix = t_opt.file_suffix;
with_bom = t_opt.with_bom;
is_local_file = true;
if (t_opt.__isset.broker_addresses) {
broker_addresses = t_opt.broker_addresses;
is_local_file = false;
}
if (t_opt.__isset.broker_properties) {
broker_properties = t_opt.broker_properties;
}
if (t_opt.__isset.success_file_name) {
success_file_name = t_opt.success_file_name;
}
if (t_opt.__isset.schema) {
schema = t_opt.schema;
is_refactor_before_flag = true;
}
if (t_opt.__isset.file_properties) {
file_properties = t_opt.file_properties;
}
if (t_opt.__isset.parquet_schemas) {
is_refactor_before_flag = false;
parquet_schemas = t_opt.parquet_schemas;
}
if (t_opt.__isset.parquet_compression_type) {
parquet_commpression_type = t_opt.parquet_compression_type;
}
if (t_opt.__isset.parquet_disable_dictionary) {
parquert_disable_dictionary = t_opt.parquet_disable_dictionary;
}
if (t_opt.__isset.parquet_version) {
parquet_version = t_opt.parquet_version;
}
if (t_opt.__isset.enable_int96_timestamps) {
enable_int96_timestamps = t_opt.enable_int96_timestamps;
}
if (t_opt.__isset.orc_schema) {
orc_schema = t_opt.orc_schema;
}
if (t_opt.__isset.orc_compression_type) {
orc_compression_type = t_opt.orc_compression_type;
}
if (t_opt.__isset.orc_writer_version) {
orc_writer_version = t_opt.orc_writer_version;
}
if (t_opt.__isset.compression_type) {
compression_type = t_opt.compression_type;
}
}
};
constexpr int RESULT_SINK_BUFFER_SIZE = 4096 * 8;
class ResultSinkLocalState final : public PipelineXSinkLocalState<BasicSharedState> {
ENABLE_FACTORY_CREATOR(ResultSinkLocalState);
using Base = PipelineXSinkLocalState<BasicSharedState>;
public:
ResultSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state)
: Base(parent, state) {}
Status init(RuntimeState* state, LocalSinkStateInfo& info) override;
Status open(RuntimeState* state) override;
Status close(RuntimeState* state, Status exec_status) override;
private:
friend class ResultSinkOperatorX;
VExprContextSPtrs _output_vexpr_ctxs;
std::shared_ptr<ResultBlockBufferBase> _sender = nullptr;
std::shared_ptr<ResultWriter> _writer = nullptr;
RuntimeProfile::Counter* _fetch_row_id_timer = nullptr;
RuntimeProfile::Counter* _write_data_timer = nullptr;
};
class ResultSinkOperatorX final : public DataSinkOperatorX<ResultSinkLocalState> {
public:
ResultSinkOperatorX(int operator_id, const RowDescriptor& row_desc,
const std::vector<TExpr>& select_exprs, const TResultSink& sink);
Status prepare(RuntimeState* state) override;
Status sink(RuntimeState* state, Block* in_block, bool eos) override;
private:
friend class ResultSinkLocalState;
Status _second_phase_fetch_data(RuntimeState* state, Block* final_block);
const TResultSinkType::type _sink_type;
const int _result_sink_buffer_size_rows;
// set file options when sink type is FILE
std::unique_ptr<ResultFileOptions> _file_opts = nullptr;
// Owned by the RuntimeState.
const RowDescriptor& _row_desc;
// Owned by the RuntimeState.
const std::vector<TExpr>& _t_output_expr;
VExprContextSPtrs _output_vexpr_ctxs;
// for fetch data by rowids
const TFetchOption _fetch_option;
std::shared_ptr<ResultBlockBufferBase> _sender = nullptr;
};
#include "common/compile_check_end.h"
} // namespace doris