Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 16 additions & 15 deletions dbms/src/Functions/FunctionsJson.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
#include <ext/range.h>
#include <limits>
#include <magic_enum.hpp>
#include <optional>
#include <string_view>
#include <type_traits>

Expand Down Expand Up @@ -435,7 +436,7 @@ class FunctionCastJsonAsString : public IFunction

bool useDefaultImplementationForConstants() const override { return true; }

void setOutputTiDBFieldType(const tipb::FieldType & tidb_tp_) { tidb_tp = &tidb_tp_; }
void setOutputTiDBFieldType(const tipb::FieldType & tidb_tp_) { tidb_tp = tidb_tp_; }

DataTypePtr getReturnTypeImpl(const DataTypes & arguments) const override
{
Expand Down Expand Up @@ -463,7 +464,7 @@ class FunctionCastJsonAsString : public IFunction
ColumnUInt8::MutablePtr col_null_map = ColumnUInt8::create(rows, 0);
ColumnUInt8::Container & vec_null_map = col_null_map->getData();
JsonBinary::JsonBinaryWriteBuffer write_buffer(data_to);
if likely (tidb_tp->flen() < 0)
if likely (!tidb_tp.has_value() || tidb_tp->flen() < 0)
{
size_t current_offset = 0;
for (size_t i = 0; i < block.rows(); ++i)
Expand Down Expand Up @@ -526,7 +527,7 @@ class FunctionCastJsonAsString : public IFunction
}

private:
const tipb::FieldType * tidb_tp = nullptr;
std::optional<tipb::FieldType> tidb_tp;
const Context & context;
};

Expand Down Expand Up @@ -1364,7 +1365,7 @@ class FunctionCastIntAsJson : public IFunction
bool useDefaultImplementationForNulls() const override { return true; }
bool useDefaultImplementationForConstants() const override { return true; }

void setInputTiDBFieldType(const tipb::FieldType & tidb_tp_) { input_tidb_tp = &tidb_tp_; }
void setInputTiDBFieldType(const tipb::FieldType & tidb_tp_) { input_tidb_tp = tidb_tp_; }

DataTypePtr getReturnTypeImpl(const DataTypes & arguments) const override
{
Expand All @@ -1390,7 +1391,7 @@ class FunctionCastIntAsJson : public IFunction
using IntFieldType = typename IntType::FieldType;
const auto & from = block.getByPosition(arguments[0]);
// In raw function test, input_tidb_tp is nullptr.
if (unlikely(input_tidb_tp == nullptr) || !hasIsBooleanFlag(*input_tidb_tp))
if (unlikely(!input_tidb_tp.has_value()) || !hasIsBooleanFlag(*input_tidb_tp))
{
if constexpr (std::is_unsigned_v<IntFieldType>)
doExecute<IntFieldType, UInt64>(data_to, offsets_to, from.column);
Expand Down Expand Up @@ -1454,7 +1455,7 @@ class FunctionCastIntAsJson : public IFunction
}

private:
const tipb::FieldType * input_tidb_tp = nullptr;
std::optional<tipb::FieldType> input_tidb_tp;
};

class FunctionCastStringAsJson : public IFunction
Expand All @@ -1470,8 +1471,8 @@ class FunctionCastStringAsJson : public IFunction
bool useDefaultImplementationForNulls() const override { return false; }
bool useDefaultImplementationForConstants() const override { return true; }

void setInputTiDBFieldType(const tipb::FieldType & tidb_tp_) { input_tidb_tp = &tidb_tp_; }
void setOutputTiDBFieldType(const tipb::FieldType & tidb_tp_) { output_tidb_tp = &tidb_tp_; }
void setInputTiDBFieldType(const tipb::FieldType & tidb_tp_) { input_tidb_tp = tidb_tp_; }
void setOutputTiDBFieldType(const tipb::FieldType & tidb_tp_) { output_tidb_tp = tidb_tp_; }
void setCollator(const TiDB::TiDBCollatorPtr & collator_) override { collator = collator_; }

DataTypePtr getReturnTypeImpl(const DataTypes & arguments) const override
Expand Down Expand Up @@ -1512,7 +1513,7 @@ class FunctionCastStringAsJson : public IFunction
// In raw function test, input_tidb_tp/output_tidb_tp is nullptr.
if (collator && collator->isBinary())
{
if (unlikely(input_tidb_tp == nullptr))
if (unlikely(!input_tidb_tp.has_value()))
{
doExecuteForBinary<false, false>(
data_to,
Expand Down Expand Up @@ -1561,7 +1562,7 @@ class FunctionCastStringAsJson : public IFunction
block.rows());
}
}
else if ((unlikely(output_tidb_tp == nullptr)) || hasParseToJSONFlag(*output_tidb_tp))
else if (unlikely(!output_tidb_tp.has_value()) || hasParseToJSONFlag(*output_tidb_tp))
{
if (from.column->isColumnNullable())
{
Expand Down Expand Up @@ -1754,8 +1755,8 @@ class FunctionCastStringAsJson : public IFunction
}

private:
const tipb::FieldType * input_tidb_tp = nullptr;
const tipb::FieldType * output_tidb_tp = nullptr;
std::optional<tipb::FieldType> input_tidb_tp;
std::optional<tipb::FieldType> output_tidb_tp;
TiDB::TiDBCollatorPtr collator = nullptr;
};

Expand All @@ -1772,7 +1773,7 @@ class FunctionCastTimeAsJson : public IFunction
bool useDefaultImplementationForNulls() const override { return true; }
bool useDefaultImplementationForConstants() const override { return true; }

void setInputTiDBFieldType(const tipb::FieldType & tidb_tp_) { input_tidb_tp = &tidb_tp_; }
void setInputTiDBFieldType(const tipb::FieldType & tidb_tp_) { input_tidb_tp = tidb_tp_; }

DataTypePtr getReturnTypeImpl(const DataTypes & arguments) const override
{
Expand All @@ -1795,7 +1796,7 @@ class FunctionCastTimeAsJson : public IFunction
if (checkDataType<DataTypeMyDateTime>(from.type.get()))
{
// In raw function test, input_tidb_tp is nullptr.
bool is_timestamp = (unlikely(input_tidb_tp == nullptr)) || input_tidb_tp->tp() == TiDB::TypeTimestamp;
bool is_timestamp = unlikely(!input_tidb_tp.has_value()) || input_tidb_tp->tp() == TiDB::TypeTimestamp;
if (is_timestamp)
doExecute<DataTypeMyDateTime, true>(data_to, offsets_to, from.column);
else
Expand Down Expand Up @@ -1845,7 +1846,7 @@ class FunctionCastTimeAsJson : public IFunction
}

private:
const tipb::FieldType * input_tidb_tp = nullptr;
std::optional<tipb::FieldType> input_tidb_tp;
};

class FunctionCastDurationAsJson : public IFunction
Expand Down
3 changes: 3 additions & 0 deletions dbms/src/Storages/StorageDisaggregated.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#pragma once

#include <Common/Logger.h>
#include <Common/config.h> // For ENABLE_NEXT_GEN_COLUMNAR
#include <Flash/Coprocessor/DAGExpressionAnalyzer.h>
#include <Flash/Coprocessor/DAGPipeline.h>
#include <Flash/Coprocessor/RemoteRequest.h>
Expand Down Expand Up @@ -153,11 +154,13 @@ class StorageDisaggregated : public IStorage
PipelineExecutorContext & exec_context,
PipelineExecGroupBuilder & group_builder,
DAGExpressionAnalyzer & analyzer);
#if ENABLE_NEXT_GEN_COLUMNAR
void filterConditionsWithPushedDownFilters(DAGExpressionAnalyzer & analyzer, DAGPipeline & pipeline);
void filterConditionsWithPushedDownFilters(
PipelineExecutorContext & exec_context,
PipelineExecGroupBuilder & group_builder,
DAGExpressionAnalyzer & analyzer);
#endif
ExpressionActionsPtr getExtraCastExpr(
DAGExpressionAnalyzer & analyzer,
bool include_pushed_down_filter_columns = false);
Expand Down
34 changes: 34 additions & 0 deletions dbms/src/Storages/StorageDisaggregatedColumnar.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,40 @@ void normalizeTimestampCompareDateTimeLiteralToUTC(tipb::Expr & expr, const Time
}
} // namespace

void StorageDisaggregated::filterConditionsWithPushedDownFilters(
DAGExpressionAnalyzer & analyzer,
DAGPipeline & pipeline)
{
// Proxy columnar reader uses late-materialization filters only to reduce packs loaded from disk.
// It does not guarantee that all rows failing those filters are removed, so merge them into
// FilterConditions and re-apply them in the TiFlash pipeline for correctness.
FilterConditions conditions(filter_conditions.executor_id, filter_conditions.conditions);
conditions.conditions.MergeFrom(table_scan.getPushedDownFilters());
if (conditions.hasValue())
{
::DB::executePushedDownFilter(conditions, analyzer, log, pipeline);
auto & profile_streams = context.getDAGContext()->getProfileStreamsMap()[conditions.executor_id];
pipeline.transform([&profile_streams](auto & stream) { profile_streams.push_back(stream); });
}
}

void StorageDisaggregated::filterConditionsWithPushedDownFilters(
PipelineExecutorContext & exec_context,
PipelineExecGroupBuilder & group_builder,
DAGExpressionAnalyzer & analyzer)
{
// Proxy columnar reader uses late-materialization filters only to reduce packs loaded from disk.
// It does not guarantee that all rows failing those filters are removed, so merge them into
// FilterConditions and re-apply them in the TiFlash pipeline for correctness.
FilterConditions conditions(filter_conditions.executor_id, filter_conditions.conditions);
conditions.conditions.MergeFrom(table_scan.getPushedDownFilters());
if (conditions.hasValue())
{
::DB::executePushedDownFilter(exec_context, group_builder, conditions, analyzer, log);
context.getDAGContext()->addOperatorProfileInfos(conditions.executor_id, group_builder.getCurProfileInfos());
}
}

BlockInputStreams StorageDisaggregated::readThroughColumnar(const Context & context, unsigned num_streams)
{
DAGPipeline pipeline;
Expand Down
36 changes: 0 additions & 36 deletions dbms/src/Storages/StorageDisaggregatedRemote.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -123,42 +123,6 @@ void StorageDisaggregated::readThroughColumnar( // NOLINT(readability-convert-me
}
#endif

void StorageDisaggregated::filterConditionsWithPushedDownFilters(
DAGExpressionAnalyzer & analyzer,
DAGPipeline & pipeline)
{
#if ENABLE_NEXT_GEN_COLUMNAR == 0
filterConditions(analyzer, pipeline);
#else
FilterConditions conditions(filter_conditions.executor_id, filter_conditions.conditions);
conditions.conditions.MergeFrom(table_scan.getPushedDownFilters());
if (conditions.hasValue())
{
::DB::executePushedDownFilter(conditions, analyzer, log, pipeline);
auto & profile_streams = context.getDAGContext()->getProfileStreamsMap()[conditions.executor_id];
pipeline.transform([&profile_streams](auto & stream) { profile_streams.push_back(stream); });
}
#endif
}

void StorageDisaggregated::filterConditionsWithPushedDownFilters(
PipelineExecutorContext & exec_context,
PipelineExecGroupBuilder & group_builder,
DAGExpressionAnalyzer & analyzer)
{
#if ENABLE_NEXT_GEN_COLUMNAR == 0
filterConditions(exec_context, group_builder, analyzer);
#else
FilterConditions conditions(filter_conditions.executor_id, filter_conditions.conditions);
conditions.conditions.MergeFrom(table_scan.getPushedDownFilters());
if (conditions.hasValue())
{
::DB::executePushedDownFilter(exec_context, group_builder, conditions, analyzer, log);
context.getDAGContext()->addOperatorProfileInfos(conditions.executor_id, group_builder.getCurProfileInfos());
}
#endif
}

BlockInputStreams StorageDisaggregated::readThroughTiFlashWrite(const Context & db_context, unsigned num_streams)
{
auto * dag_context = context.getDAGContext();
Expand Down
86 changes: 86 additions & 0 deletions tests/fullstack-test/expr/cast_as_json_issue10845.test
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
# Copyright 2026 PingCAP, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Regression for #10845. On next-gen columnar, filters on
# JSON_EXTRACT(cast(text as json), ...) could return inverted results
# when Selection predicates are merged with TableScan pushed-down filters.

mysql> drop table if exists test.event_log_text;
mysql> drop table if exists test.event_log_json;

mysql> create table test.event_log_text (event_timestamp bigint not null, approximate_arrival_timestamp datetime not null, action_params text default null);
mysql> create table test.event_log_json (event_timestamp bigint not null, approximate_arrival_timestamp datetime not null, action_params json default null);

mysql> insert into test.event_log_text (event_timestamp, approximate_arrival_timestamp, action_params) values (1747312496000, '2026-05-15 12:34:56', '{\"popup_id\":\"123\"}'), (1747312556000, '2026-05-15 12:35:56', '{\"popup_id\":\"123\"}'), (1747312616000, '2026-05-15 12:36:56', '{\"popup_id\":\"0\"}'), (1747312676000, '2026-05-15 12:37:56', '{\"popup_id\":\"456\"}'), (1747312736000, '2026-05-15 12:38:56', '{\"popup_id\":\"789\"}'); #NO_UNESCAPE

mysql> insert into test.event_log_json (event_timestamp, approximate_arrival_timestamp, action_params) values (1747312496000, '2026-05-15 12:34:56', '{\"popup_id\":\"123\"}'), (1747312556000, '2026-05-15 12:35:56', '{\"popup_id\":\"123\"}'), (1747312616000, '2026-05-15 12:36:56', '{\"popup_id\":\"0\"}'), (1747312676000, '2026-05-15 12:37:56', '{\"popup_id\":\"456\"}'), (1747312736000, '2026-05-15 12:38:56', '{\"popup_id\":\"789\"}'); #NO_UNESCAPE

mysql> alter table test.event_log_text set tiflash replica 1;
mysql> alter table test.event_log_json set tiflash replica 1;

func> wait_table test event_log_text
func> wait_table test event_log_json

mysql> set tidb_enforce_mpp=1; set tidb_isolation_read_engines='tiflash'; select count(*) as c from test.event_log_text where json_extract(action_params, '$.popup_id') is null;
+---+
| c |
+---+
| 0 |
+---+

mysql> set tidb_enforce_mpp=1; set tidb_isolation_read_engines='tiflash'; select count(*) as c from test.event_log_text where json_extract(action_params, '$.popup_id') is not null;
+---+
| c |
+---+
| 5 |
+---+

mysql> set tidb_enforce_mpp=1; set tidb_isolation_read_engines='tiflash'; select count(*) as c from test.event_log_json where json_extract(action_params, '$.popup_id') is null;
+---+
| c |
+---+
| 0 |
+---+

mysql> set tidb_enforce_mpp=1; set tidb_isolation_read_engines='tiflash'; select count(*) as c from test.event_log_json where json_extract(action_params, '$.popup_id') is not null;
+---+
| c |
+---+
| 5 |
+---+

mysql> set tidb_enforce_mpp=1; set tidb_isolation_read_engines='tiflash'; select event_timestamp, json_unquote(json_extract(action_params, '$.popup_id')) as popup_id from test.event_log_text where json_extract(action_params, '$.popup_id') is not null order by event_timestamp; #NO_UNESCAPE
+-----------------+----------------+
| event_timestamp | popup_id |
+-----------------+----------------+
| 1747312496000 | 123 |
| 1747312556000 | 123 |
| 1747312616000 | 0 |
| 1747312676000 | 456 |
| 1747312736000 | 789 |
+-----------------+----------------+

mysql> set tidb_enforce_mpp=1; set tidb_isolation_read_engines='tiflash'; select event_timestamp, json_unquote(json_extract(action_params, '$.popup_id')) as popup_id from test.event_log_json where json_extract(action_params, '$.popup_id') is not null order by event_timestamp; #NO_UNESCAPE
+-----------------+----------------+
| event_timestamp | popup_id |
+-----------------+----------------+
| 1747312496000 | 123 |
| 1747312556000 | 123 |
| 1747312616000 | 0 |
| 1747312676000 | 456 |
| 1747312736000 | 789 |
+-----------------+----------------+

mysql> drop table if exists test.event_log_text;
mysql> drop table if exists test.event_log_json;