Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions be/src/olap/push_handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -532,10 +532,8 @@ Status PushBrokerReader::_convert_to_output_block(vectorized::Block* block) {
vectorized::ColumnPtr column_ptr;

auto& ctx = _dest_expr_ctxs[dest_index];
int result_column_id = -1;
// PT1 => dest primitive type
RETURN_IF_ERROR(ctx->execute(&_src_block, &result_column_id));
column_ptr = _src_block.get_by_position(result_column_id).column;
RETURN_IF_ERROR(ctx->execute(&_src_block, column_ptr));
// column_ptr maybe a ColumnConst, convert it to a normal column
column_ptr = column_ptr->convert_to_full_column_if_const();
DCHECK(column_ptr);
Expand Down
7 changes: 3 additions & 4 deletions be/src/pipeline/exec/analytic_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -902,10 +902,9 @@ size_t AnalyticSinkOperatorX::get_reserve_mem_size(RuntimeState* state, bool eos
Status AnalyticSinkOperatorX::_insert_range_column(vectorized::Block* block,
const vectorized::VExprContextSPtr& expr,
vectorized::IColumn* dst_column, size_t length) {
int result_col_id = -1;
RETURN_IF_ERROR(expr->execute(block, &result_col_id));
DCHECK_GE(result_col_id, 0);
auto column = block->get_by_position(result_col_id).column->convert_to_full_column_if_const();
vectorized::ColumnPtr column;
RETURN_IF_ERROR(expr->execute(block, column));
column = column->convert_to_full_column_if_const();
// iff dst_column is string, maybe overflow of 4G, so need ignore overflow
// the column is used by compare_at self to find the range, it's need convert it when overflow?
dst_column->insert_range_from_ignore_overflow(*column, 0, length);
Expand Down
18 changes: 10 additions & 8 deletions be/src/pipeline/exec/dict_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

#include "common/status.h"
#include "vec/core/block.h"
#include "vec/core/column_with_type_and_name.h"
#include "vec/functions/complex_hash_map_dictionary.h"
#include "vec/functions/dictionary_factory.h"
#include "vec/functions/dictionary_util.h"
Expand Down Expand Up @@ -53,20 +54,21 @@ Status DictSinkLocalState::load_dict(RuntimeState* state) {

for (long key_expr_id : p._key_output_expr_slots) {
auto key_expr_ctx = _output_vexpr_ctxs[key_expr_id];
int key_column_id = -1;
RETURN_IF_ERROR(key_expr_ctx->execute(&input_block, &key_column_id));
key_data.push_back(input_block.get_by_position(key_column_id));
vectorized::ColumnWithTypeAndName key_exec_data;
RETURN_IF_ERROR(key_expr_ctx->execute(&input_block, key_exec_data));

key_data.push_back(key_exec_data);
}

for (size_t i = 0; i < p._value_output_expr_slots.size(); i++) {
auto value_expr_id = p._value_output_expr_slots[i];
auto value_name = p._value_names[i];
auto value_expr_ctx = _output_vexpr_ctxs[value_expr_id];
int value_column_id = -1;
RETURN_IF_ERROR(value_expr_ctx->execute(&input_block, &value_column_id));
auto att_data = input_block.get_by_position(value_column_id);
att_data.name = value_name;
value_data.push_back(att_data);

vectorized::ColumnPtr value_column;
RETURN_IF_ERROR(value_expr_ctx->execute(&input_block, value_column));
auto value_type = value_expr_ctx->execute_type(&input_block);
value_data.push_back({value_column, value_type, value_name});
}

RETURN_IF_ERROR(check_dict_input_data(key_data, value_data, p._skip_null_key));
Expand Down
12 changes: 6 additions & 6 deletions be/src/pipeline/exec/operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -309,16 +309,16 @@ Status OperatorXBase::do_projections(RuntimeState* state, vectorized::Block* ori
}
vectorized::Block input_block = *origin_block;

std::vector<int> result_column_ids;
size_t bytes_usage = 0;
vectorized::ColumnsWithTypeAndName new_columns;
for (const auto& projections : local_state->_intermediate_projections) {
result_column_ids.resize(projections.size());
new_columns.resize(projections.size());
for (int i = 0; i < projections.size(); i++) {
RETURN_IF_ERROR(projections[i]->execute(&input_block, &result_column_ids[i]));
RETURN_IF_ERROR(projections[i]->execute(&input_block, new_columns[i]));
}

bytes_usage += input_block.allocated_bytes();
input_block.shuffle_columns(result_column_ids);
vectorized::Block tmp_block {new_columns};
bytes_usage += tmp_block.allocated_bytes();
input_block.swap(tmp_block);
}

DCHECK_EQ(rows, input_block.rows());
Expand Down
13 changes: 5 additions & 8 deletions be/src/pipeline/exec/partition_sort_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ Status PartitionSortSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo
auto& p = _parent->cast<PartitionSortSinkOperatorX>();
RETURN_IF_ERROR(p._vsort_exec_exprs.clone(state, _vsort_exec_exprs));
_partition_expr_ctxs.resize(p._partition_expr_ctxs.size());
_partition_columns.resize(p._partition_expr_ctxs.size());
for (size_t i = 0; i < p._partition_expr_ctxs.size(); i++) {
RETURN_IF_ERROR(p._partition_expr_ctxs[i]->clone(state, _partition_expr_ctxs[i]));
}
Expand Down Expand Up @@ -181,15 +180,13 @@ Status PartitionSortSinkOperatorX::sink(RuntimeState* state, vectorized::Block*

Status PartitionSortSinkOperatorX::_split_block_by_partition(
vectorized::Block* input_block, PartitionSortSinkLocalState& local_state, bool eos) {
vectorized::ColumnRawPtrs key_columns_raw_ptr(_partition_exprs_num);
vectorized::Columns key_columns(_partition_exprs_num);
for (int i = 0; i < _partition_exprs_num; ++i) {
int result_column_id = -1;
RETURN_IF_ERROR(_partition_expr_ctxs[i]->execute(input_block, &result_column_id));
DCHECK(result_column_id != -1);
local_state._partition_columns[i] =
input_block->get_by_position(result_column_id).column.get();
RETURN_IF_ERROR(_partition_expr_ctxs[i]->execute(input_block, key_columns[i]));
key_columns_raw_ptr[i] = key_columns[i].get();
}
RETURN_IF_ERROR(_emplace_into_hash_table(local_state._partition_columns, input_block,
local_state, eos));
RETURN_IF_ERROR(_emplace_into_hash_table(key_columns_raw_ptr, input_block, local_state, eos));
return Status::OK();
}

Expand Down
1 change: 0 additions & 1 deletion be/src/pipeline/exec/partition_sort_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ class PartitionSortSinkLocalState : public PipelineXSinkLocalState<PartitionSort
int64_t _sorted_partition_input_rows = 0;
std::vector<PartitionDataPtr> _value_places;
int _num_partition = 0;
std::vector<const vectorized::IColumn*> _partition_columns;
std::unique_ptr<PartitionedHashMapVariants> _partitioned_data;
std::unique_ptr<vectorized::Arena> _agg_arena_pool;
int _partition_exprs_num = 0;
Expand Down
12 changes: 5 additions & 7 deletions be/src/pipeline/exec/repeat_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "pipeline/exec/operator.h"
#include "vec/common/assert_cast.h"
#include "vec/core/block.h"
#include "vec/core/column_with_type_and_name.h"

namespace doris {
#include "common/compile_check_begin.h"
Expand Down Expand Up @@ -190,13 +191,10 @@ Status RepeatOperatorX::push(RuntimeState* state, vectorized::Block* input_block
intermediate_block = vectorized::Block::create_unique();

for (auto& expr : expr_ctxs) {
int result_column_id = -1;
RETURN_IF_ERROR(expr->execute(input_block, &result_column_id));
DCHECK(result_column_id != -1);
input_block->get_by_position(result_column_id).column =
input_block->get_by_position(result_column_id)
.column->convert_to_full_column_if_const();
intermediate_block->insert(input_block->get_by_position(result_column_id));
vectorized::ColumnWithTypeAndName result_data;
RETURN_IF_ERROR(expr->execute(input_block, result_data));
result_data.column = result_data.column->convert_to_full_column_if_const();
intermediate_block->insert(result_data);
}
DCHECK_EQ(expr_ctxs.size(), intermediate_block->columns());
}
Expand Down
7 changes: 4 additions & 3 deletions be/src/pipeline/exec/union_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include "common/status.h"
#include "operator.h"
#include "vec/core/block.h"
#include "vec/core/column_with_type_and_name.h"

namespace doris {
#include "common/compile_check_begin.h"
Expand Down Expand Up @@ -166,9 +167,9 @@ class UnionSinkOperatorX MOCK_REMOVE(final) : public DataSinkOperatorX<UnionSink
const auto& child_exprs = local_state._child_expr;
vectorized::ColumnsWithTypeAndName colunms;
for (size_t i = 0; i < child_exprs.size(); ++i) {
int result_column_id = -1;
RETURN_IF_ERROR(child_exprs[i]->execute(src_block, &result_column_id));
colunms.emplace_back(src_block->get_by_position(result_column_id));
vectorized::ColumnWithTypeAndName result_data;
RETURN_IF_ERROR(child_exprs[i]->execute(src_block, result_data));
colunms.emplace_back(result_data);
}
local_state._child_row_idx += src_block->rows();
*res_block = {colunms};
Expand Down
12 changes: 3 additions & 9 deletions be/src/vec/common/sort/sorter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -141,18 +141,12 @@ Status Sorter::partial_sort(Block& src_block, Block& dest_block, bool reversed)
size_t num_cols = src_block.columns();
if (_materialize_sort_exprs) {
auto output_tuple_expr_ctxs = _vsort_exec_exprs.sort_tuple_slot_expr_ctxs();
std::vector<int> valid_column_ids(output_tuple_expr_ctxs.size());
ColumnsWithTypeAndName columns_data(output_tuple_expr_ctxs.size());
for (int i = 0; i < output_tuple_expr_ctxs.size(); ++i) {
RETURN_IF_ERROR(output_tuple_expr_ctxs[i]->execute(&src_block, &valid_column_ids[i]));
RETURN_IF_ERROR(output_tuple_expr_ctxs[i]->execute(&src_block, columns_data[i]));
}

Block new_block;
for (auto column_id : valid_column_ids) {
if (column_id < 0) {
continue;
}
new_block.insert(src_block.get_by_position(column_id));
}
Block new_block {columns_data};
dest_block.swap(new_block);
}

Expand Down
15 changes: 4 additions & 11 deletions be/src/vec/exec/format/json/new_json_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1495,18 +1495,11 @@ Status NewJsonReader::_get_column_default_value(
if (ctx->root()->node_type() == TExprNodeType::type::NULL_LITERAL) {
continue;
}
// empty block to save default value of slot_desc->col_name()
Block block;
// If block is empty, some functions will produce no result. So we insert a column with
// single value here.
block.insert({ColumnUInt8::create(1), std::make_shared<DataTypeUInt8>(), ""});
int result = -1;
RETURN_IF_ERROR(ctx->execute(&block, &result));
DCHECK(result != -1);
auto column = block.get_by_position(result).column;
DCHECK(column->size() == 1);
ColumnWithTypeAndName result;
RETURN_IF_ERROR(ctx->execute_const_expr(result));
DCHECK(result.column->size() == 1);
_col_default_value_map.emplace(slot_desc->col_name(),
column->get_data_at(0).to_string());
result.column->get_data_at(0).to_string());
}
}
return Status::OK();
Expand Down
12 changes: 3 additions & 9 deletions be/src/vec/exec/format/orc/vorc_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1359,7 +1359,6 @@ Status OrcReader::_fill_partition_columns(
Status OrcReader::_fill_missing_columns(
Block* block, uint64_t rows,
const std::unordered_map<std::string, VExprContextSPtr>& missing_columns) {
std::set<size_t> positions_to_erase;
for (const auto& kv : missing_columns) {
if (!_col_name_to_block_idx->contains(kv.first)) {
return Status::InternalError("Failed to find missing column: {}, block: {}", kv.first,
Expand All @@ -1374,16 +1373,13 @@ Status OrcReader::_fill_missing_columns(
} else {
// fill with default value
const auto& ctx = kv.second;
auto origin_column_num = block->columns();
int result_column_id = -1;
// PT1 => dest primitive type
RETURN_IF_ERROR(ctx->execute(block, &result_column_id));
bool is_origin_column = result_column_id < origin_column_num;
if (!is_origin_column) {
ColumnPtr result_column_ptr;
RETURN_IF_ERROR(ctx->execute(block, result_column_ptr));
if (result_column_ptr->use_count() == 1) {
// call resize because the first column of _src_block_ptr may not be filled by reader,
// so _src_block_ptr->rows() may return wrong result, cause the column created by `ctx->execute()`
// has only one row.
auto result_column_ptr = block->get_by_position(result_column_id).column;
auto mutable_column = result_column_ptr->assume_mutable();
mutable_column->resize(rows);
// result_column_ptr maybe a ColumnConst, convert it to a normal column
Expand All @@ -1394,11 +1390,9 @@ Status OrcReader::_fill_missing_columns(
block->replace_by_position(
(*_col_name_to_block_idx)[kv.first],
is_nullable ? make_nullable(result_column_ptr) : result_column_ptr);
positions_to_erase.insert(result_column_id);
}
}
}
block->erase(positions_to_erase);
return Status::OK();
}

Expand Down
12 changes: 3 additions & 9 deletions be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -699,7 +699,6 @@ Status RowGroupReader::_fill_partition_columns(
Status RowGroupReader::_fill_missing_columns(
Block* block, size_t rows,
const std::unordered_map<std::string, VExprContextSPtr>& missing_columns) {
std::set<size_t> positions_to_erase;
for (const auto& kv : missing_columns) {
if (!_col_name_to_block_idx->contains(kv.first)) {
return Status::InternalError("Missing column: {} not found in block {}", kv.first,
Expand All @@ -714,16 +713,13 @@ Status RowGroupReader::_fill_missing_columns(
} else {
// fill with default value
const auto& ctx = kv.second;
auto origin_column_num = block->columns();
int result_column_id = -1;
ColumnPtr result_column_ptr;
// PT1 => dest primitive type
RETURN_IF_ERROR(ctx->execute(block, &result_column_id));
bool is_origin_column = result_column_id < origin_column_num;
if (!is_origin_column) {
RETURN_IF_ERROR(ctx->execute(block, result_column_ptr));
if (result_column_ptr->use_count() == 1) {
// call resize because the first column of _src_block_ptr may not be filled by reader,
// so _src_block_ptr->rows() may return wrong result, cause the column created by `ctx->execute()`
// has only one row.
auto result_column_ptr = block->get_by_position(result_column_id).column;
auto mutable_column = result_column_ptr->assume_mutable();
mutable_column->resize(rows);
// result_column_ptr maybe a ColumnConst, convert it to a normal column
Expand All @@ -734,11 +730,9 @@ Status RowGroupReader::_fill_missing_columns(
block->replace_by_position(
(*_col_name_to_block_idx)[kv.first],
is_nullable ? make_nullable(result_column_ptr) : result_column_ptr);
positions_to_erase.insert(result_column_id);
}
}
}
block->erase(positions_to_erase);
return Status::OK();
}

Expand Down
14 changes: 4 additions & 10 deletions be/src/vec/exec/scan/file_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -679,16 +679,13 @@ Status FileScanner::_fill_missing_columns(size_t rows) {
} else {
// fill with default value
auto& ctx = kv.second;
auto origin_column_num = _src_block_ptr->columns();
int result_column_id = -1;
ColumnPtr result_column_ptr;
// PT1 => dest primitive type
RETURN_IF_ERROR(ctx->execute(_src_block_ptr, &result_column_id));
bool is_origin_column = result_column_id < origin_column_num;
if (!is_origin_column) {
RETURN_IF_ERROR(ctx->execute(_src_block_ptr, result_column_ptr));
if (result_column_ptr->use_count() == 1) {
// call resize because the first column of _src_block_ptr may not be filled by reader,
// so _src_block_ptr->rows() may return wrong result, cause the column created by `ctx->execute()`
// has only one row.
auto result_column_ptr = _src_block_ptr->get_by_position(result_column_id).column;
auto mutable_column = result_column_ptr->assume_mutable();
mutable_column->resize(rows);
// result_column_ptr maybe a ColumnConst, convert it to a normal column
Expand All @@ -703,7 +700,6 @@ Status FileScanner::_fill_missing_columns(size_t rows) {
_src_block_ptr->replace_by_position(
_src_block_name_to_idx[kv.first],
is_nullable ? make_nullable(result_column_ptr) : result_column_ptr);
_src_block_ptr->erase(result_column_id);
}
}
}
Expand Down Expand Up @@ -777,10 +773,8 @@ Status FileScanner::_convert_to_output_block(Block* block) {
vectorized::ColumnPtr column_ptr;

auto& ctx = _dest_vexpr_ctx[dest_index];
int result_column_id = -1;
// PT1 => dest primitive type
RETURN_IF_ERROR(ctx->execute(_src_block_ptr, &result_column_id));
column_ptr = _src_block_ptr->get_by_position(result_column_id).column;
RETURN_IF_ERROR(ctx->execute(_src_block_ptr, column_ptr));
// column_ptr maybe a ColumnConst, convert it to a normal column
column_ptr = column_ptr->convert_to_full_column_if_const();
DCHECK(column_ptr);
Expand Down
6 changes: 2 additions & 4 deletions be/src/vec/exprs/table_function/vexplode_bitmap.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,8 @@ Status VExplodeBitmapTableFunction::process_init(Block* block, RuntimeState* sta
<< "VExplodeNumbersTableFunction must be have 1 children but have "
<< _expr_context->root()->children().size();

int value_column_idx = -1;
RETURN_IF_ERROR(_expr_context->root()->children()[0]->execute(_expr_context.get(), block,
&value_column_idx));
_value_column = block->get_by_position(value_column_idx).column;
RETURN_IF_ERROR(_expr_context->root()->children()[0]->execute_column(
_expr_context.get(), block, block->rows(), _value_column));

return Status::OK();
}
Expand Down
6 changes: 2 additions & 4 deletions be/src/vec/exprs/table_function/vexplode_json_object.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,9 @@ Status VExplodeJsonObjectTableFunction::process_init(Block* block, RuntimeState*
<< "VExplodeJsonObjectTableFunction only support 1 child but has "
<< _expr_context->root()->children().size();

int text_column_idx = -1;
RETURN_IF_ERROR(_expr_context->root()->children()[0]->execute(_expr_context.get(), block,
&text_column_idx));
RETURN_IF_ERROR(_expr_context->root()->children()[0]->execute_column(
_expr_context.get(), block, block->rows(), _json_object_column));

_json_object_column = block->get_by_position(text_column_idx).column;
return Status::OK();
}

Expand Down
12 changes: 12 additions & 0 deletions be/src/vec/exprs/vexpr_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,18 @@ Status VExprContext::execute(const Block* block, ColumnPtr& result_column) {
return st;
}

Status VExprContext::execute(const Block* block, ColumnWithTypeAndName& result_data) {
Status st;
ColumnPtr result_column;
RETURN_IF_CATCH_EXCEPTION(
{ st = _root->execute_column(this, block, block->rows(), result_column); });
RETURN_IF_ERROR(st);
result_data.column = result_column;
result_data.type = execute_type(block);
result_data.name = _root->expr_name();
return Status::OK();
}

DataTypePtr VExprContext::execute_type(const Block* block) {
return _root->execute_type(block);
}
Expand Down
1 change: 1 addition & 0 deletions be/src/vec/exprs/vexpr_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ class VExprContext {
[[nodiscard]] Status clone(RuntimeState* state, VExprContextSPtr& new_ctx);
[[nodiscard]] Status execute(Block* block, int* result_column_id);
[[nodiscard]] Status execute(const Block* block, ColumnPtr& result_column);
[[nodiscard]] Status execute(const Block* block, ColumnWithTypeAndName& result_data);
[[nodiscard]] DataTypePtr execute_type(const Block* block);
[[nodiscard]] const std::string& expr_name() const;
[[nodiscard]] bool is_blockable() const;
Expand Down
Loading