Skip to content

Commit 8dfcf74

Browse files
committed
executor: disable runtime filter on join key type mismatch
1 parent 01b12dd commit 8dfcf74

2 files changed

Lines changed: 182 additions & 3 deletions

File tree

dbms/src/Flash/Planner/Plans/PhysicalJoin.cpp

Lines changed: 50 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
#include <Flash/Coprocessor/DAGContext.h>
2222
#include <Flash/Coprocessor/DAGExpressionAnalyzer.h>
2323
#include <Flash/Coprocessor/DAGPipeline.h>
24+
#include <Flash/Coprocessor/DAGUtils.h>
2425
#include <Flash/Coprocessor/InterpreterUtils.h>
2526
#include <Flash/Coprocessor/JoinInterpreterHelper.h>
2627
#include <Flash/Pipeline/PipelineBuilder.h>
@@ -32,7 +33,6 @@
3233
#include <Interpreters/Context.h>
3334
#include <common/logger_useful.h>
3435
#include <fmt/format.h>
35-
3636
namespace DB
3737
{
3838
namespace FailPoints
@@ -158,8 +158,55 @@ PhysicalPlanNodePtr PhysicalJoin::build(
158158
{
159159
build_key_names_map[original_build_key_names[i]] = build_key_names[i];
160160
}
161-
auto runtime_filter_list
162-
= tiflash_join.genRuntimeFilterList(context, build_source_columns, build_key_names_map, log);
161+
162+
// Conservative correctness guard:
163+
// If join key *protobuf field types* across sides are not compatible, skip runtime filter as early as possible
164+
// to avoid wrong filtering / wasted work.
165+
//
166+
// Why here:
167+
// - We haven't created/registered any RuntimeFilter yet.
168+
// - We still have access to `tipb::Join` and can cheaply check original field types.
169+
//
170+
// NOTE: This is intentionally conservative. Join itself will still work because join keys are cast to a common
171+
// type for execution, but RF's Set header/value normalization may not be safe under mismatched signed/unsigned
172+
// or integer/decimal scenarios.
173+
auto is_join_key_field_type_compatible = [&]() -> bool {
174+
const int n = join.left_join_keys_size();
175+
if (n != join.right_join_keys_size())
176+
return false;
177+
for (int i = 0; i < n; ++i)
178+
{
179+
if (unlikely(
180+
!exprHasValidFieldType(join.left_join_keys(i)) || !exprHasValidFieldType(join.right_join_keys(i))))
181+
return false;
182+
183+
const auto & lt = join.left_join_keys(i).field_type();
184+
const auto & rt = join.right_join_keys(i).field_type();
185+
186+
// If TiDB says the two sides are different basic tp, we don't try to be smart here.
187+
if (lt.tp() != rt.tp())
188+
return false;
189+
190+
// Signed/unsigned mismatch: when the tp is integer-like, TiDB encodes unsigned via flag.
191+
// This is the known problematic case for RF(IN).
192+
if (hasUnsignedFlag(lt) != hasUnsignedFlag(rt))
193+
return false;
194+
}
195+
return true;
196+
};
197+
198+
const bool enable_runtime_filter = is_join_key_field_type_compatible();
199+
if (!enable_runtime_filter && !join.runtime_filter_list().empty())
200+
{
201+
LOG_INFO(
202+
log,
203+
"Disable runtime filter for join {} due to join-side key type mismatch (left/right key field types differ)",
204+
executor_id);
205+
}
206+
207+
auto runtime_filter_list = enable_runtime_filter
208+
? tiflash_join.genRuntimeFilterList(context, build_source_columns, build_key_names_map, log)
209+
: std::vector<RuntimeFilterPtr>{};
163210
LOG_DEBUG(log, "before register runtime filter list, list size:{}", runtime_filter_list.size());
164211
context.getDAGContext()->runtime_filter_mgr.registerRuntimeFilterList(runtime_filter_list);
165212

Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
// Copyright 2023 PingCAP, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
#include <Debug/MockRuntimeFilter.h>
16+
#include <Interpreters/Context.h>
17+
#include <TestUtils/ExecutorTestUtils.h>
18+
19+
namespace DB
20+
{
21+
namespace tests
22+
{
23+
24+
/// Coverage for the conservative guard in `PhysicalJoin::build`:
25+
/// - If join-side key protobuf field types are not compatible, we should skip creating/registering runtime filter.
26+
///
27+
/// Behavioral contract in this test:
28+
/// - We still attach a RuntimeFilter request in the mock DAG.
29+
/// - Because key types mismatch, the optimization should be disabled (i.e. no filtering happens).
30+
/// - The join result should remain correct (same row count as “without runtime filter”).
31+
class RuntimeFilterDisableOnTypeMismatchTestRunner : public DB::tests::ExecutorTest
32+
{
33+
public:
34+
void initializeContext() override
35+
{
36+
ExecutorTest::initializeContext();
37+
context.mockStorage()->setUseDeltaMerge(true);
38+
}
39+
40+
static constexpr size_t concurrency = 10;
41+
};
42+
43+
#define WrapForRuntimeFilterTestBegin \
44+
std::vector<bool> pipelineBools{false, true}; \
45+
for (auto enablePipelineFlag : pipelineBools) \
46+
{ \
47+
enablePipeline(enablePipelineFlag);
48+
49+
#define WrapForRuntimeFilterTestEnd }
50+
51+
TEST_F(RuntimeFilterDisableOnTypeMismatchTestRunner, DisableRuntimeFilterWhenJoinKeyFieldTypeMismatch)
52+
try
53+
{
54+
context.context->getSettingsRef().dt_segment_stable_pack_rows = 1;
55+
context.context->getSettingsRef().dt_segment_limit_rows = 1;
56+
context.context->getSettingsRef().dt_segment_delta_cache_limit_rows = 1;
57+
context.context->getSettingsRef().dt_segment_force_split_size = 70;
58+
context.context->getSettingsRef().enable_hash_join_v2 = false;
59+
60+
// Probe(left) join key: Int32
61+
// Note: When using DeltaMerge in tests, the primary key column is expected to be representable by integer.
62+
// So we add an explicit integer handle column `pk` as the primary key and keep `k1` as a normal column.
63+
context.addMockDeltaMerge(
64+
{"test_db", "left_table"},
65+
{{"pk", TiDB::TP::TypeLongLong, false}, {"k1", TiDB::TP::TypeLong}},
66+
{toVec<Int64>("pk", {1, 2, 3}), toNullableVec<Int32>("k1", {1, 2, 3})},
67+
concurrency);
68+
69+
// Build(right) join key: Int64 (mismatch)
70+
context.addExchangeReceiver(
71+
"right_exchange_table_i64",
72+
{{"k1", TiDB::TP::TypeLongLong}},
73+
{toNullableVec<Int64>("k1", {2, 2, 3, 4})});
74+
75+
// Build(right) join key: UInt32 (mismatch due to unsigned flag)
76+
context.addExchangeReceiver(
77+
"right_exchange_table_u32",
78+
{{"k1", TiDB::TP::TypeLong, true}},
79+
{toNullableVec<UInt32>("k1", {2, 2, 3, 4})});
80+
81+
WrapForRuntimeFilterTestBegin
82+
{
83+
// Baseline: without runtime filter.
84+
auto request
85+
= context.scan("test_db", "left_table")
86+
.join(context.receive("right_exchange_table_i64"), tipb::JoinType::TypeInnerJoin, {col("k1")})
87+
.build(context);
88+
Expect expect{
89+
{"table_scan_0", {not_check_rows, not_check_concurrency}},
90+
{"exchange_receiver_1", {4, concurrency}},
91+
{"Join_2", {3, concurrency}}};
92+
testForExecutionSummary(request, expect);
93+
}
94+
95+
{
96+
// With runtime filter requested, but type mismatch => runtime filter should be disabled.
97+
mock::MockRuntimeFilter rf(1, col("k1"), col("k1"), "exchange_receiver_1", "table_scan_0");
98+
auto request
99+
= context.scan("test_db", "left_table", std::vector<int>{1})
100+
.join(context.receive("right_exchange_table_i64"), tipb::JoinType::TypeInnerJoin, {col("k1")}, rf)
101+
.build(context);
102+
// Expect no RF pruning, same as baseline.
103+
Expect expect{
104+
{"table_scan_0", {not_check_rows, not_check_concurrency}},
105+
{"exchange_receiver_1", {4, concurrency}},
106+
{"Join_2", {3, concurrency}}};
107+
testForExecutionSummary(request, expect);
108+
}
109+
110+
{
111+
// With runtime filter requested, but signed/unsigned mismatch => runtime filter should be disabled.
112+
mock::MockRuntimeFilter rf(1, col("k1"), col("k1"), "exchange_receiver_1", "table_scan_0");
113+
auto request
114+
= context.scan("test_db", "left_table", std::vector<int>{1})
115+
.join(context.receive("right_exchange_table_u32"), tipb::JoinType::TypeInnerJoin, {col("k1")}, rf)
116+
.build(context);
117+
// Expect no RF pruning, same as baseline.
118+
Expect expect{
119+
{"table_scan_0", {not_check_rows, not_check_concurrency}},
120+
{"exchange_receiver_1", {4, concurrency}},
121+
{"Join_2", {3, concurrency}}};
122+
testForExecutionSummary(request, expect);
123+
}
124+
WrapForRuntimeFilterTestEnd
125+
}
126+
CATCH
127+
128+
#undef WrapForRuntimeFilterTestBegin
129+
#undef WrapForRuntimeFilterTestEnd
130+
131+
} // namespace tests
132+
} // namespace DB

0 commit comments

Comments
 (0)