Skip to content

Commit f61f401

Browse files
committed
executor: disable runtime filter on join key type mismatch
1 parent 01b12dd commit f61f401

2 files changed

Lines changed: 186 additions & 6 deletions

File tree

dbms/src/Flash/Planner/Plans/PhysicalJoin.cpp

Lines changed: 56 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
#include <Flash/Coprocessor/DAGContext.h>
2222
#include <Flash/Coprocessor/DAGExpressionAnalyzer.h>
2323
#include <Flash/Coprocessor/DAGPipeline.h>
24+
#include <Flash/Coprocessor/DAGUtils.h>
2425
#include <Flash/Coprocessor/InterpreterUtils.h>
2526
#include <Flash/Coprocessor/JoinInterpreterHelper.h>
2627
#include <Flash/Pipeline/PipelineBuilder.h>
@@ -32,7 +33,6 @@
3233
#include <Interpreters/Context.h>
3334
#include <common/logger_useful.h>
3435
#include <fmt/format.h>
35-
3636
namespace DB
3737
{
3838
namespace FailPoints
@@ -158,10 +158,60 @@ PhysicalPlanNodePtr PhysicalJoin::build(
158158
{
159159
build_key_names_map[original_build_key_names[i]] = build_key_names[i];
160160
}
161-
auto runtime_filter_list
162-
= tiflash_join.genRuntimeFilterList(context, build_source_columns, build_key_names_map, log);
163-
LOG_DEBUG(log, "before register runtime filter list, list size:{}", runtime_filter_list.size());
164-
context.getDAGContext()->runtime_filter_mgr.registerRuntimeFilterList(runtime_filter_list);
161+
162+
// Conservative correctness guard:
163+
// If join key *protobuf field types* across sides are not compatible, skip runtime filter as early as possible
164+
// to avoid wrong filtering / wasted work.
165+
//
166+
// Why here:
167+
// - We haven't created/registered any RuntimeFilter yet.
168+
// - We still have access to `tipb::Join` and can cheaply check original field types.
169+
//
170+
// NOTE: This is intentionally conservative. Join itself will still work because join keys are cast to a common
171+
// type for execution, but RF's Set header/value normalization may not be safe under mismatched signed/unsigned
172+
// or integer/decimal scenarios.
173+
auto isJoinKeyFieldTypeCompatible = [&]() -> bool {
174+
const int n = join.left_join_keys_size();
175+
if (n != join.right_join_keys_size())
176+
return false;
177+
for (int i = 0; i < n; ++i)
178+
{
179+
if (unlikely(
180+
!exprHasValidFieldType(join.left_join_keys(i)) || !exprHasValidFieldType(join.right_join_keys(i))))
181+
return false;
182+
183+
const auto & lt = join.left_join_keys(i).field_type();
184+
const auto & rt = join.right_join_keys(i).field_type();
185+
186+
// If TiDB says the two sides are different basic tp, we don't try to be smart here.
187+
if (lt.tp() != rt.tp())
188+
return false;
189+
190+
// Signed/unsigned mismatch: when the tp is integer-like, TiDB encodes unsigned via flag.
191+
// This is the known problematic case for RF(IN).
192+
if (hasUnsignedFlag(lt) != hasUnsignedFlag(rt))
193+
return false;
194+
195+
// Otherwise keep conservative: if we got here (same tp + same unsigned flag), treat as compatible.
196+
continue;
197+
}
198+
return true;
199+
};
200+
201+
const bool enableRuntimeFilter = isJoinKeyFieldTypeCompatible();
202+
if (!enableRuntimeFilter && !join.runtime_filter_list().empty())
203+
{
204+
LOG_INFO(
205+
log,
206+
"Disable runtime filter for join {} due to join-side key type mismatch (left/right key field types differ)",
207+
executor_id);
208+
}
209+
210+
auto runtimeFilterList = enableRuntimeFilter
211+
? tiflash_join.genRuntimeFilterList(context, build_source_columns, build_key_names_map, log)
212+
: std::vector<RuntimeFilterPtr>{};
213+
LOG_DEBUG(log, "before register runtime filter list, list size:{}", runtimeFilterList.size());
214+
context.getDAGContext()->runtime_filter_mgr.registerRuntimeFilterList(runtimeFilterList);
165215

166216
JoinPtr join_ptr = std::make_shared<Join>(
167217
probe_key_names,
@@ -189,7 +239,7 @@ PhysicalPlanNodePtr PhysicalJoin::build(
189239
flag_mapped_entry_helper_name,
190240
settings.join_probe_cache_columns_threshold,
191241
context.isTest(),
192-
runtime_filter_list);
242+
runtimeFilterList);
193243

194244
recordJoinExecuteInfo(dag_context, executor_id, build_plan->execId(), join_ptr);
195245

Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
// Copyright 2023 PingCAP, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
#include <Debug/MockRuntimeFilter.h>
16+
#include <Interpreters/Context.h>
17+
#include <TestUtils/ExecutorTestUtils.h>
18+
19+
namespace DB
20+
{
21+
namespace tests
22+
{
23+
24+
/// Coverage for the conservative guard in `PhysicalJoin::build`:
25+
/// - If join-side key protobuf field types are not compatible, we should skip creating/registering runtime filter.
26+
///
27+
/// Behavioral contract in this test:
28+
/// - We still attach a RuntimeFilter request in the mock DAG.
29+
/// - Because key types mismatch, the optimization should be disabled (i.e. no filtering happens).
30+
/// - The join result should remain correct (same row count as “without runtime filter”).
31+
class RuntimeFilterDisableOnTypeMismatchTestRunner : public DB::tests::ExecutorTest
32+
{
33+
public:
34+
void initializeContext() override
35+
{
36+
ExecutorTest::initializeContext();
37+
context.mockStorage()->setUseDeltaMerge(true);
38+
}
39+
40+
static constexpr size_t concurrency = 10;
41+
};
42+
43+
#define WrapForRuntimeFilterTestBegin \
44+
std::vector<bool> pipelineBools{false, true}; \
45+
for (auto enablePipeline : pipelineBools) \
46+
{ \
47+
enablePipeline(enablePipeline);
48+
49+
#define WrapForRuntimeFilterTestEnd }
50+
51+
TEST_F(RuntimeFilterDisableOnTypeMismatchTestRunner, DisableRuntimeFilterWhenJoinKeyFieldTypeMismatch)
52+
try
53+
{
54+
context.context->getSettingsRef().dt_segment_stable_pack_rows = 1;
55+
context.context->getSettingsRef().dt_segment_limit_rows = 1;
56+
context.context->getSettingsRef().dt_segment_delta_cache_limit_rows = 1;
57+
context.context->getSettingsRef().dt_segment_force_split_size = 70;
58+
context.context->getSettingsRef().enable_hash_join_v2 = false;
59+
60+
// Probe(left) join key: Int32
61+
context.addMockDeltaMerge(
62+
{"test_db", "left_table"},
63+
{{"k1", TiDB::TP::TypeLong}},
64+
{toNullableVec<Int32>("k1", {1, 2, 3})},
65+
concurrency);
66+
67+
// Build(right) join key: Int64 (mismatch)
68+
context.addExchangeReceiver(
69+
"right_exchange_table_i64",
70+
{{"k1", TiDB::TP::TypeLongLong}},
71+
{toNullableVec<Int64>("k1", {2, 2, 3, 4})});
72+
73+
// Build(right) join key: UInt32 (mismatch due to unsigned flag)
74+
context.addExchangeReceiver(
75+
"right_exchange_table_u32",
76+
{{"k1", TiDB::TP::TypeLong, true}},
77+
{toNullableVec<UInt32>("k1", {2, 2, 3, 4})});
78+
79+
WrapForRuntimeFilterTestBegin
80+
{
81+
// Baseline: without runtime filter.
82+
auto request
83+
= context.scan("test_db", "left_table")
84+
.join(context.receive("right_exchange_table_i64"), tipb::JoinType::TypeInnerJoin, {col("k1")})
85+
.build(context);
86+
Expect expect{
87+
{"table_scan_0", {3, enable_pipeline ? concurrency : 1}},
88+
{"exchange_receiver_1", {4, concurrency}},
89+
{"Join_2", {2, concurrency}}};
90+
testForExecutionSummary(request, expect);
91+
}
92+
93+
{
94+
// With runtime filter requested, but type mismatch => runtime filter should be disabled.
95+
mock::MockRuntimeFilter rf(1, col("k1"), col("k1"), "exchange_receiver_1", "table_scan_0");
96+
auto request
97+
= context.scan("test_db", "left_table", std::vector<int>{1})
98+
.join(context.receive("right_exchange_table_i64"), tipb::JoinType::TypeInnerJoin, {col("k1")}, rf)
99+
.build(context);
100+
// Expect no RF pruning, same as baseline.
101+
Expect expect{
102+
{"table_scan_0", {3, enable_pipeline ? concurrency : 1}},
103+
{"exchange_receiver_1", {4, concurrency}},
104+
{"Join_2", {2, concurrency}}};
105+
testForExecutionSummary(request, expect);
106+
}
107+
108+
{
109+
// With runtime filter requested, but signed/unsigned mismatch => runtime filter should be disabled.
110+
mock::MockRuntimeFilter rf(1, col("k1"), col("k1"), "exchange_receiver_1", "table_scan_0");
111+
auto request
112+
= context.scan("test_db", "left_table", std::vector<int>{1})
113+
.join(context.receive("right_exchange_table_u32"), tipb::JoinType::TypeInnerJoin, {col("k1")}, rf)
114+
.build(context);
115+
// Expect no RF pruning, same as baseline.
116+
Expect expect{
117+
{"table_scan_0", {3, enable_pipeline ? concurrency : 1}},
118+
{"exchange_receiver_1", {4, concurrency}},
119+
{"Join_2", {2, concurrency}}};
120+
testForExecutionSummary(request, expect);
121+
}
122+
WrapForRuntimeFilterTestEnd
123+
}
124+
CATCH
125+
126+
#undef WrapForRuntimeFilterTestBegin
127+
#undef WrapForRuntimeFilterTestEnd
128+
129+
} // namespace tests
130+
} // namespace DB

0 commit comments

Comments
 (0)