Skip to content

Commit 91d4f88

Browse files
committed
executor: disable runtime filter on join key type mismatch
1 parent 01b12dd commit 91d4f88

2 files changed

Lines changed: 180 additions & 3 deletions

File tree

dbms/src/Flash/Planner/Plans/PhysicalJoin.cpp

Lines changed: 53 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
#include <Flash/Coprocessor/DAGContext.h>
2222
#include <Flash/Coprocessor/DAGExpressionAnalyzer.h>
2323
#include <Flash/Coprocessor/DAGPipeline.h>
24+
#include <Flash/Coprocessor/DAGUtils.h>
2425
#include <Flash/Coprocessor/InterpreterUtils.h>
2526
#include <Flash/Coprocessor/JoinInterpreterHelper.h>
2627
#include <Flash/Pipeline/PipelineBuilder.h>
@@ -32,7 +33,6 @@
3233
#include <Interpreters/Context.h>
3334
#include <common/logger_useful.h>
3435
#include <fmt/format.h>
35-
3636
namespace DB
3737
{
3838
namespace FailPoints
@@ -158,8 +158,58 @@ PhysicalPlanNodePtr PhysicalJoin::build(
158158
{
159159
build_key_names_map[original_build_key_names[i]] = build_key_names[i];
160160
}
161-
auto runtime_filter_list
162-
= tiflash_join.genRuntimeFilterList(context, build_source_columns, build_key_names_map, log);
161+
162+
// Conservative correctness guard:
163+
// If join key *protobuf field types* across sides are not compatible, skip runtime filter as early as possible
164+
// to avoid wrong filtering / wasted work.
165+
//
166+
// Why here:
167+
// - We haven't created/registered any RuntimeFilter yet.
168+
// - We still have access to `tipb::Join` and can cheaply check original field types.
169+
//
170+
// NOTE: This is intentionally conservative. Join itself will still work because join keys are cast to a common
171+
// type for execution, but RF's Set header/value normalization may not be safe under mismatched signed/unsigned
172+
// or integer/decimal scenarios.
173+
auto is_join_key_field_type_compatible = [&]() -> bool {
174+
const int n = join.left_join_keys_size();
175+
if (n != join.right_join_keys_size())
176+
return false;
177+
for (int i = 0; i < n; ++i)
178+
{
179+
if (unlikely(
180+
!exprHasValidFieldType(join.left_join_keys(i)) || !exprHasValidFieldType(join.right_join_keys(i))))
181+
return false;
182+
183+
const auto & lt = join.left_join_keys(i).field_type();
184+
const auto & rt = join.right_join_keys(i).field_type();
185+
186+
// If TiDB says the two sides are different basic tp, we don't try to be smart here.
187+
if (lt.tp() != rt.tp())
188+
return false;
189+
190+
// Signed/unsigned mismatch: when the tp is integer-like, TiDB encodes unsigned via flag.
191+
// This is the known problematic case for RF(IN).
192+
if (hasUnsignedFlag(lt) != hasUnsignedFlag(rt))
193+
return false;
194+
195+
// Otherwise keep conservative: if we got here (same tp + same unsigned flag), treat as compatible.
196+
continue;
197+
}
198+
return true;
199+
};
200+
201+
const bool enable_runtime_filter = is_join_key_field_type_compatible();
202+
if (!enable_runtime_filter && !join.runtime_filter_list().empty())
203+
{
204+
LOG_INFO(
205+
log,
206+
"Disable runtime filter for join {} due to join-side key type mismatch (left/right key field types differ)",
207+
executor_id);
208+
}
209+
210+
auto runtime_filter_list = enable_runtime_filter
211+
? tiflash_join.genRuntimeFilterList(context, build_source_columns, build_key_names_map, log)
212+
: std::vector<RuntimeFilterPtr>{};
163213
LOG_DEBUG(log, "before register runtime filter list, list size:{}", runtime_filter_list.size());
164214
context.getDAGContext()->runtime_filter_mgr.registerRuntimeFilterList(runtime_filter_list);
165215

Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
// Copyright 2023 PingCAP, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
#include <Debug/MockRuntimeFilter.h>
16+
#include <Interpreters/Context.h>
17+
#include <TestUtils/ExecutorTestUtils.h>
18+
19+
namespace DB
20+
{
21+
namespace tests
22+
{
23+
24+
/// Coverage for the conservative guard in `PhysicalJoin::build`:
25+
/// - If join-side key protobuf field types are not compatible, we should skip creating/registering runtime filter.
26+
///
27+
/// Behavioral contract in this test:
28+
/// - We still attach a RuntimeFilter request in the mock DAG.
29+
/// - Because key types mismatch, the optimization should be disabled (i.e. no filtering happens).
30+
/// - The join result should remain correct (same row count as “without runtime filter”).
31+
class RuntimeFilterDisableOnTypeMismatchTestRunner : public DB::tests::ExecutorTest
32+
{
33+
public:
34+
void initializeContext() override
35+
{
36+
ExecutorTest::initializeContext();
37+
context.mockStorage()->setUseDeltaMerge(true);
38+
}
39+
40+
static constexpr size_t concurrency = 10;
41+
};
42+
43+
#define WRAP_FOR_RF_TEST_BEGIN \
44+
std::vector<bool> pipeline_bools{false, true}; \
45+
for (auto enable_pipeline : pipeline_bools) \
46+
{ \
47+
enablePipeline(enable_pipeline);
48+
49+
#define WRAP_FOR_RF_TEST_END }
50+
51+
TEST_F(RuntimeFilterDisableOnTypeMismatchTestRunner, DisableRuntimeFilterWhenJoinKeyFieldTypeMismatch)
52+
try
53+
{
54+
context.context->getSettingsRef().dt_segment_stable_pack_rows = 1;
55+
context.context->getSettingsRef().dt_segment_limit_rows = 1;
56+
context.context->getSettingsRef().dt_segment_delta_cache_limit_rows = 1;
57+
context.context->getSettingsRef().dt_segment_force_split_size = 70;
58+
context.context->getSettingsRef().enable_hash_join_v2 = false;
59+
60+
// Probe(left) join key: Int32
61+
context.addMockDeltaMerge(
62+
{"test_db", "left_table"},
63+
{{"k1", TiDB::TP::TypeLong}},
64+
{toNullableVec<Int32>("k1", {1, 2, 3})},
65+
concurrency);
66+
67+
// Build(right) join key: Int64 (mismatch)
68+
context.addExchangeReceiver(
69+
"right_exchange_table_i64",
70+
{{"k1", TiDB::TP::TypeLongLong}},
71+
{toNullableVec<Int64>("k1", {2, 2, 3, 4})});
72+
73+
// Build(right) join key: UInt32 (mismatch due to unsigned flag)
74+
context.addExchangeReceiver(
75+
"right_exchange_table_u32",
76+
{{"k1", TiDB::TP::TypeLong, true}},
77+
{toNullableVec<UInt32>("k1", {2, 2, 3, 4})});
78+
79+
WRAP_FOR_RF_TEST_BEGIN
80+
{
81+
// Baseline: without runtime filter.
82+
auto request = context.scan("test_db", "left_table")
83+
.join(context.receive("right_exchange_table_i64"), tipb::JoinType::TypeInnerJoin, {col("k1")})
84+
.build(context);
85+
Expect expect{
86+
{"table_scan_0", {3, enable_pipeline ? concurrency : 1}},
87+
{"exchange_receiver_1", {4, concurrency}},
88+
{"Join_2", {2, concurrency}}};
89+
testForExecutionSummary(request, expect);
90+
}
91+
92+
{
93+
// With runtime filter requested, but type mismatch => runtime filter should be disabled.
94+
mock::MockRuntimeFilter rf(1, col("k1"), col("k1"), "exchange_receiver_1", "table_scan_0");
95+
auto request = context.scan("test_db", "left_table", std::vector<int>{1})
96+
.join(context.receive("right_exchange_table_i64"), tipb::JoinType::TypeInnerJoin, {col("k1")}, rf)
97+
.build(context);
98+
// Expect no RF pruning, same as baseline.
99+
Expect expect{
100+
{"table_scan_0", {3, enable_pipeline ? concurrency : 1}},
101+
{"exchange_receiver_1", {4, concurrency}},
102+
{"Join_2", {2, concurrency}}};
103+
testForExecutionSummary(request, expect);
104+
}
105+
106+
{
107+
// With runtime filter requested, but signed/unsigned mismatch => runtime filter should be disabled.
108+
mock::MockRuntimeFilter rf(1, col("k1"), col("k1"), "exchange_receiver_1", "table_scan_0");
109+
auto request = context.scan("test_db", "left_table", std::vector<int>{1})
110+
.join(context.receive("right_exchange_table_u32"), tipb::JoinType::TypeInnerJoin, {col("k1")}, rf)
111+
.build(context);
112+
// Expect no RF pruning, same as baseline.
113+
Expect expect{
114+
{"table_scan_0", {3, enable_pipeline ? concurrency : 1}},
115+
{"exchange_receiver_1", {4, concurrency}},
116+
{"Join_2", {2, concurrency}}};
117+
testForExecutionSummary(request, expect);
118+
}
119+
WRAP_FOR_RF_TEST_END
120+
}
121+
CATCH
122+
123+
#undef WRAP_FOR_RF_TEST_BEGIN
124+
#undef WRAP_FOR_RF_TEST_END
125+
126+
} // namespace tests
127+
} // namespace DB

0 commit comments

Comments
 (0)