Skip to content

Commit 980c188

Browse files
committed
[Feature](func) Support table function json_each, json_each_text
1 parent 3bf910c commit 980c188

11 files changed

Lines changed: 1045 additions & 1 deletion

File tree

be/src/vec/exprs/table_function/table_function_factory.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
#include "vec/exprs/table_function/vexplode_map.h"
3434
#include "vec/exprs/table_function/vexplode_numbers.h"
3535
#include "vec/exprs/table_function/vexplode_v2.h"
36+
#include "vec/exprs/table_function/vjson_each.h"
3637
#include "vec/utils/util.hpp"
3738

3839
namespace doris::vectorized {
@@ -50,6 +51,8 @@ const std::unordered_map<std::string, std::function<std::unique_ptr<TableFunctio
5051
{"explode_bitmap", TableFunctionCreator<VExplodeBitmapTableFunction>()},
5152
{"explode_map", TableFunctionCreator<VExplodeMapTableFunction> {}},
5253
{"explode_json_object", TableFunctionCreator<VExplodeJsonObjectTableFunction> {}},
54+
{"json_each", TableFunctionCreator<VJsonEachTableFn> {}},
55+
{"json_each_text", TableFunctionCreator<VJsonEachTextTableFn> {}},
5356
{"posexplode", TableFunctionCreator<VExplodeV2TableFunction> {}},
5457
{"explode", TableFunctionCreator<VExplodeV2TableFunction> {}},
5558
{"explode_variant_array_old", TableFunctionCreator<VExplodeTableFunction>()},
Lines changed: 213 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,213 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
#include "vec/exprs/table_function/vjson_each.h"
19+
20+
#include <glog/logging.h>
21+
22+
#include <ostream>
23+
#include <string>
24+
25+
#include "common/status.h"
26+
#include "util/jsonb_document.h"
27+
#include "util/jsonb_utils.h"
28+
#include "util/jsonb_writer.h"
29+
#include "vec/columns/column.h"
30+
#include "vec/columns/column_const.h"
31+
#include "vec/columns/column_struct.h"
32+
#include "vec/common/assert_cast.h"
33+
#include "vec/common/string_ref.h"
34+
#include "vec/core/block.h"
35+
#include "vec/core/column_with_type_and_name.h"
36+
#include "vec/exprs/vexpr.h"
37+
#include "vec/exprs/vexpr_context.h"
38+
39+
namespace doris::vectorized {
40+
#include "common/compile_check_begin.h"
41+
42+
template <bool TEXT_MODE>
43+
VJsonEachTableFunction<TEXT_MODE>::VJsonEachTableFunction() {
44+
_fn_name = TEXT_MODE ? "vjson_each_text" : "vjson_each";
45+
}
46+
47+
template <bool TEXT_MODE>
48+
Status VJsonEachTableFunction<TEXT_MODE>::process_init(Block* block, RuntimeState* /*state*/) {
49+
int value_column_idx = -1;
50+
RETURN_IF_ERROR(_expr_context->root()->children()[0]->execute(_expr_context.get(), block,
51+
&value_column_idx));
52+
auto [col, is_const] = unpack_if_const(block->get_by_position(value_column_idx).column);
53+
_json_column = col;
54+
_is_const = is_const;
55+
return Status::OK();
56+
}
57+
58+
// Helper: insert one JsonbValue as plain text into a ColumnNullable<ColumnString>.
59+
// For strings: raw blob content (quotes stripped, matching json_each_text PG semantics).
60+
// For null JSON values: SQL NULL (insert_default).
61+
// For all others (numbers, bools, objects, arrays): JSON text representation.
62+
static void insert_value_as_text(const JsonbValue* value, MutableColumnPtr& col) {
63+
if (value == nullptr || value->isNull()) {
64+
col->insert_default();
65+
return;
66+
}
67+
if (value->isString()) {
68+
const auto* str_val = value->unpack<JsonbStringVal>();
69+
col->insert_data(str_val->getBlob(), str_val->getBlobLen());
70+
} else {
71+
JsonbToJson converter;
72+
std::string text = converter.to_json_string(value);
73+
col->insert_data(text.data(), text.size());
74+
}
75+
}
76+
77+
// Helper: insert one JsonbValue in JSONB binary form into a ColumnNullable<ColumnString>.
78+
// For null JSON values: SQL NULL (insert_default).
79+
// For all others: write JSONB binary via JsonbWriter.
80+
static void insert_value_as_json(const JsonbValue* value, MutableColumnPtr& col,
81+
JsonbWriter& writer) {
82+
if (value == nullptr || value->isNull()) {
83+
col->insert_default();
84+
return;
85+
}
86+
writer.reset();
87+
writer.writeValue(value);
88+
const auto* buf = writer.getOutput()->getBuffer();
89+
size_t len = writer.getOutput()->getSize();
90+
col->insert_data(buf, len);
91+
}
92+
93+
template <bool TEXT_MODE>
94+
void VJsonEachTableFunction<TEXT_MODE>::process_row(size_t row_idx) {
95+
TableFunction::process_row(row_idx);
96+
97+
StringRef text;
98+
const size_t idx = _is_const ? 0 : row_idx;
99+
if (const auto* nullable_col = check_and_get_column<ColumnNullable>(*_json_column)) {
100+
if (nullable_col->is_null_at(idx)) {
101+
return;
102+
}
103+
text = assert_cast<const ColumnString&>(nullable_col->get_nested_column()).get_data_at(idx);
104+
} else {
105+
text = assert_cast<const ColumnString&>(*_json_column).get_data_at(idx);
106+
}
107+
108+
const JsonbDocument* doc = nullptr;
109+
auto st = JsonbDocument::checkAndCreateDocument(text.data, text.size, &doc);
110+
if (!st.ok() || !doc || !doc->getValue()) [[unlikely]] {
111+
return;
112+
}
113+
114+
const JsonbValue* jv = doc->getValue();
115+
if (!jv->isObject()) {
116+
return;
117+
}
118+
119+
const auto* obj = jv->unpack<ObjectVal>();
120+
_cur_size = obj->numElem();
121+
if (_cur_size == 0) {
122+
return;
123+
}
124+
125+
_kv_pairs.first = ColumnNullable::create(ColumnString::create(), ColumnUInt8::create());
126+
_kv_pairs.second = ColumnNullable::create(ColumnString::create(), ColumnUInt8::create());
127+
_kv_pairs.first->reserve(_cur_size);
128+
_kv_pairs.second->reserve(_cur_size);
129+
130+
if constexpr (TEXT_MODE) {
131+
for (const auto& kv : *obj) {
132+
_kv_pairs.first->insert_data(kv.getKeyStr(), kv.klen());
133+
insert_value_as_text(kv.value(), _kv_pairs.second);
134+
}
135+
} else {
136+
JsonbWriter writer;
137+
for (const auto& kv : *obj) {
138+
_kv_pairs.first->insert_data(kv.getKeyStr(), kv.klen());
139+
insert_value_as_json(kv.value(), _kv_pairs.second, writer);
140+
}
141+
}
142+
}
143+
144+
template <bool TEXT_MODE>
145+
void VJsonEachTableFunction<TEXT_MODE>::process_close() {
146+
_json_column = nullptr;
147+
_kv_pairs.first = nullptr;
148+
_kv_pairs.second = nullptr;
149+
}
150+
151+
template <bool TEXT_MODE>
152+
void VJsonEachTableFunction<TEXT_MODE>::get_same_many_values(MutableColumnPtr& column, int length) {
153+
if (current_empty()) {
154+
column->insert_many_defaults(length);
155+
return;
156+
}
157+
158+
ColumnStruct* ret = nullptr;
159+
if (_is_nullable) {
160+
ret = assert_cast<ColumnStruct*>(
161+
assert_cast<ColumnNullable*>(column.get())->get_nested_column_ptr().get());
162+
assert_cast<ColumnUInt8*>(
163+
assert_cast<ColumnNullable*>(column.get())->get_null_map_column_ptr().get())
164+
->insert_many_defaults(length);
165+
} else if (is_column<ColumnStruct>(column.get())) {
166+
ret = assert_cast<ColumnStruct*>(column.get());
167+
} else {
168+
throw Exception(ErrorCode::INTERNAL_ERROR, "json_each: unexpected output column type: {}",
169+
column->get_name());
170+
}
171+
172+
DCHECK(ret && ret->tuple_size() == 2)
173+
<< "json_each output struct must have exactly 2 fields (key, value)";
174+
175+
ret->get_column(0).insert_many_from(*_kv_pairs.first, _cur_offset, length);
176+
ret->get_column(1).insert_many_from(*_kv_pairs.second, _cur_offset, length);
177+
}
178+
179+
template <bool TEXT_MODE>
180+
int VJsonEachTableFunction<TEXT_MODE>::get_value(MutableColumnPtr& column, int max_step) {
181+
max_step = std::min(max_step, (int)(_cur_size - _cur_offset));
182+
183+
if (current_empty()) {
184+
column->insert_default();
185+
max_step = 1;
186+
} else {
187+
ColumnStruct* struct_col = nullptr;
188+
if (_is_nullable) {
189+
auto* nullable_col = assert_cast<ColumnNullable*>(column.get());
190+
struct_col = assert_cast<ColumnStruct*>(nullable_col->get_nested_column_ptr().get());
191+
assert_cast<ColumnUInt8*>(nullable_col->get_null_map_column_ptr().get())
192+
->insert_many_defaults(max_step);
193+
} else {
194+
struct_col = assert_cast<ColumnStruct*>(column.get());
195+
}
196+
197+
DCHECK(struct_col && struct_col->tuple_size() == 2)
198+
<< "json_each output struct must have exactly 2 fields (key, value)";
199+
200+
struct_col->get_column(0).insert_range_from(*_kv_pairs.first, _cur_offset, max_step);
201+
struct_col->get_column(1).insert_range_from(*_kv_pairs.second, _cur_offset, max_step);
202+
}
203+
204+
forward(max_step);
205+
return max_step;
206+
}
207+
208+
// // Explicit template instantiations
209+
template class VJsonEachTableFunction<false>; // json_each
210+
template class VJsonEachTableFunction<true>; // json_each_text
211+
212+
#include "common/compile_check_end.h"
213+
} // namespace doris::vectorized
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
#pragma once
19+
20+
#include <cstddef>
21+
22+
#include "common/status.h"
23+
#include "vec/data_types/data_type.h"
24+
#include "vec/exprs/table_function/table_function.h"
25+
26+
namespace doris::vectorized {
27+
#include "common/compile_check_begin.h"
28+
class Block;
29+
30+
// json_each('{"a":"foo","b":123}') →
31+
// | key | value |
32+
// | a | "foo" (JSON) |
33+
// | b | 123 (JSON) |
34+
//
35+
// json_each_text('{"a":"foo","b":123}') →
36+
// | key | value |
37+
// | a | foo | ← string unquoted
38+
// | b | 123 | ← number as text
39+
//
40+
// TEXT_MODE=false → json_each (value column type: JSONB binary)
41+
// TEXT_MODE=true → json_each_text (value column type: plain STRING)
42+
template <bool TEXT_MODE>
43+
class VJsonEachTableFunction : public TableFunction {
44+
ENABLE_FACTORY_CREATOR(VJsonEachTableFunction);
45+
46+
public:
47+
VJsonEachTableFunction();
48+
49+
~VJsonEachTableFunction() override = default;
50+
51+
Status process_init(Block* block, RuntimeState* state) override;
52+
void process_row(size_t row_idx) override;
53+
void process_close() override;
54+
void get_same_many_values(MutableColumnPtr& column, int length) override;
55+
int get_value(MutableColumnPtr& column, int max_step) override;
56+
57+
private:
58+
ColumnPtr _json_column;
59+
// _kv_pairs.first : ColumnNullable<ColumnString> key (always plain text)
60+
// _kv_pairs.second : ColumnNullable<ColumnString> value (JSONB bytes or plain text)
61+
std::pair<MutableColumnPtr, MutableColumnPtr> _kv_pairs;
62+
};
63+
64+
using VJsonEachTableFn = VJsonEachTableFunction<false>;
65+
using VJsonEachTextTableFn = VJsonEachTableFunction<true>;
66+
67+
#include "common/compile_check_end.h"
68+
} // namespace doris::vectorized

be/src/vec/functions/function_fake.cpp

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,34 @@ struct FunctionExplodeJsonObject {
151151
static std::string get_error_msg() { return "Fake function do not support execute"; }
152152
};
153153

154+
// json_each(json) -> Nullable(Struct(key Nullable(String), value Nullable(JSONB)))
155+
struct FunctionJsonEach {
156+
static DataTypePtr get_return_type_impl(const DataTypes& arguments) {
157+
DCHECK_EQ(arguments[0]->get_primitive_type(), PrimitiveType::TYPE_JSONB)
158+
<< " json_each " << arguments[0]->get_name() << " not supported";
159+
DataTypes fieldTypes(2);
160+
fieldTypes[0] = make_nullable(std::make_shared<DataTypeString>());
161+
fieldTypes[1] = make_nullable(std::make_shared<DataTypeJsonb>());
162+
return make_nullable(std::make_shared<vectorized::DataTypeStruct>(fieldTypes));
163+
}
164+
static DataTypes get_variadic_argument_types() { return {}; }
165+
static std::string get_error_msg() { return "Fake function do not support execute"; }
166+
};
167+
168+
// json_each_text(json) -> Nullable(Struct(key Nullable(String), value Nullable(String)))
169+
struct FunctionJsonEachText {
170+
static DataTypePtr get_return_type_impl(const DataTypes& arguments) {
171+
DCHECK_EQ(arguments[0]->get_primitive_type(), PrimitiveType::TYPE_JSONB)
172+
<< " json_each_text " << arguments[0]->get_name() << " not supported";
173+
DataTypes fieldTypes(2);
174+
fieldTypes[0] = make_nullable(std::make_shared<DataTypeString>());
175+
fieldTypes[1] = make_nullable(std::make_shared<DataTypeString>());
176+
return make_nullable(std::make_shared<vectorized::DataTypeStruct>(fieldTypes));
177+
}
178+
static DataTypes get_variadic_argument_types() { return {}; }
179+
static std::string get_error_msg() { return "Fake function do not support execute"; }
180+
};
181+
154182
struct FunctionEsquery {
155183
static DataTypePtr get_return_type_impl(const DataTypes& arguments) {
156184
return FunctionFakeBaseImpl<DataTypeUInt8>::get_return_type_impl(arguments);
@@ -239,6 +267,8 @@ void register_function_fake(SimpleFunctionFactory& factory) {
239267
register_table_function_expand_outer<FunctionExplodeMap>(factory, "explode_map");
240268

241269
register_table_function_expand_outer<FunctionExplodeJsonObject>(factory, "explode_json_object");
270+
register_function<FunctionJsonEach>(factory, "json_each");
271+
register_function<FunctionJsonEachText>(factory, "json_each_text");
242272
register_table_function_expand_outer_default<DataTypeString, false>(factory, "explode_split");
243273
register_table_function_expand_outer_default<DataTypeInt32, false>(factory, "explode_numbers");
244274
register_table_function_expand_outer_default<DataTypeInt64, false>(factory,

0 commit comments

Comments
 (0)