Skip to content

Commit 0a42273

Browse files
starocean999zclllyybb
authored andcommitted
[feature](join) support ASOF join (apache#59591)
apache/doris-website#3290 docs here
1 parent 5b88188 commit 0a42273

94 files changed

Lines changed: 6144 additions & 343 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

be/src/common/status.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -757,6 +757,14 @@ using ResultError = unexpected<Status>;
757757
std::forward<T>(res).value(); \
758758
})
759759

760+
// core in Debug mode, exception in Release mode.
761+
#define DORIS_CHECK(stmt) \
762+
do { \
763+
if (!static_cast<bool>(stmt)) [[unlikely]] { \
764+
throw Exception(Status::FatalError(fmt::format("Check failed: {}", #stmt))); \
765+
} \
766+
} while (false)
767+
760768
} // namespace doris
761769

762770
// specify formatter for Status

be/src/pipeline/common/join_utils.h

Lines changed: 131 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,35 @@
1717

1818
#pragma once
1919

20+
#include <algorithm>
2021
#include <variant>
2122

2223
#include "vec/common/hash_table/hash_key_type.h"
2324
#include "vec/common/hash_table/hash_map_context.h"
2425
#include "vec/common/hash_table/join_hash_table.h"
2526

2627
namespace doris {
28+
29+
// Devirtualize compare_at for ASOF JOIN supported column types.
30+
// ASOF JOIN only supports DateV2, DateTimeV2, and TimestampTZ.
31+
// Dispatches to the concrete ColumnVector<T> once so that all compare_at
32+
// calls inside `func` are direct (non-virtual) calls.
33+
// `func` receives a single argument: a const pointer to the concrete column
34+
// (or const IColumn* as fallback for unexpected types).
35+
template <typename Func>
36+
decltype(auto) asof_column_dispatch(const vectorized::IColumn* col, Func&& func) {
37+
if (const auto* c_dv2 = vectorized::check_and_get_column<vectorized::ColumnDateV2>(col)) {
38+
return std::forward<Func>(func)(c_dv2);
39+
} else if (const auto* c_dtv2 =
40+
vectorized::check_and_get_column<vectorized::ColumnDateTimeV2>(col)) {
41+
return std::forward<Func>(func)(c_dtv2);
42+
} else if (const auto* c_tstz =
43+
vectorized::check_and_get_column<vectorized::ColumnTimeStampTz>(col)) {
44+
return std::forward<Func>(func)(c_tstz);
45+
} else {
46+
return std::forward<Func>(func)(col);
47+
}
48+
}
2749
using JoinOpVariants =
2850
std::variant<std::integral_constant<TJoinOp::type, TJoinOp::INNER_JOIN>,
2951
std::integral_constant<TJoinOp::type, TJoinOp::LEFT_SEMI_JOIN>,
@@ -35,7 +57,20 @@ using JoinOpVariants =
3557
std::integral_constant<TJoinOp::type, TJoinOp::RIGHT_SEMI_JOIN>,
3658
std::integral_constant<TJoinOp::type, TJoinOp::RIGHT_ANTI_JOIN>,
3759
std::integral_constant<TJoinOp::type, TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN>,
38-
std::integral_constant<TJoinOp::type, TJoinOp::NULL_AWARE_LEFT_SEMI_JOIN>>;
60+
std::integral_constant<TJoinOp::type, TJoinOp::NULL_AWARE_LEFT_SEMI_JOIN>,
61+
std::integral_constant<TJoinOp::type, TJoinOp::ASOF_LEFT_INNER_JOIN>,
62+
std::integral_constant<TJoinOp::type, TJoinOp::ASOF_LEFT_OUTER_JOIN>>;
63+
64+
inline bool is_asof_join(TJoinOp::type join_op) {
65+
return join_op == TJoinOp::ASOF_LEFT_INNER_JOIN || join_op == TJoinOp::ASOF_LEFT_OUTER_JOIN;
66+
}
67+
68+
template <int JoinOpType>
69+
inline constexpr bool is_asof_join_op_v =
70+
JoinOpType == TJoinOp::ASOF_LEFT_INNER_JOIN || JoinOpType == TJoinOp::ASOF_LEFT_OUTER_JOIN;
71+
72+
template <int JoinOpType>
73+
inline constexpr bool is_asof_outer_join_op_v = JoinOpType == TJoinOp::ASOF_LEFT_OUTER_JOIN;
3974

4075
template <class T>
4176
using PrimaryTypeHashTableContext =
@@ -219,4 +254,99 @@ inline void try_convert_to_direct_mapping(
219254
primary_to_direct_mapping(context, key_columns, variant_ptrs);
220255
}
221256

257+
// ASOF JOIN index with inline values for cache-friendly branchless binary search.
258+
// IntType is the integer representation of the ASOF column value:
259+
// uint32_t for DateV2, uint64_t for DateTimeV2 and TimestampTZ.
260+
// Rows are sorted by asof_value during build, then materialized into SoA arrays
261+
// so probe-side binary search only touches the ASOF values hot path.
262+
template <typename IntType>
263+
struct AsofIndexGroup {
264+
using int_type = IntType;
265+
266+
struct Entry {
267+
IntType asof_value;
268+
uint32_t row_index; // 1-based, 0 = invalid/padding
269+
};
270+
271+
std::vector<Entry> entries;
272+
std::vector<IntType> asof_values;
273+
std::vector<uint32_t> row_indexes;
274+
275+
void add_row(IntType value, uint32_t row_idx) { entries.push_back({value, row_idx}); }
276+
277+
void sort_and_finalize() {
278+
if (entries.empty()) {
279+
return;
280+
}
281+
if (entries.size() > 1) {
282+
pdqsort(entries.begin(), entries.end(),
283+
[](const Entry& a, const Entry& b) { return a.asof_value < b.asof_value; });
284+
}
285+
286+
asof_values.resize(entries.size());
287+
row_indexes.resize(entries.size());
288+
for (size_t i = 0; i < entries.size(); ++i) {
289+
asof_values[i] = entries[i].asof_value;
290+
row_indexes[i] = entries[i].row_index;
291+
}
292+
293+
std::vector<Entry>().swap(entries);
294+
}
295+
296+
const IntType* values_data() const { return asof_values.data(); }
297+
298+
// Branchless lower_bound: first i where asof_values[i] >= target
299+
ALWAYS_INLINE size_t lower_bound(IntType target) const {
300+
size_t lo = 0, n = asof_values.size();
301+
while (n > 1) {
302+
size_t half = n / 2;
303+
lo += half * (asof_values[lo + half] < target);
304+
n -= half;
305+
}
306+
if (lo < asof_values.size()) {
307+
lo += (asof_values[lo] < target);
308+
}
309+
return lo;
310+
}
311+
312+
// Branchless upper_bound: first i where asof_values[i] > target
313+
ALWAYS_INLINE size_t upper_bound(IntType target) const {
314+
size_t lo = 0, n = asof_values.size();
315+
while (n > 1) {
316+
size_t half = n / 2;
317+
lo += half * (asof_values[lo + half] <= target);
318+
n -= half;
319+
}
320+
if (lo < asof_values.size()) {
321+
lo += (asof_values[lo] <= target);
322+
}
323+
return lo;
324+
}
325+
326+
// Semantics by (is_greater, is_strict):
327+
// (true, false): probe >= build -> find largest build value <= probe
328+
// (true, true): probe > build -> find largest build value < probe
329+
// (false, false): probe <= build -> find smallest build value >= probe
330+
// (false, true): probe < build -> find smallest build value > probe
331+
// Returns the build row index of the best match, or 0 if no match.
332+
template <bool IsGreater, bool IsStrict>
333+
ALWAYS_INLINE uint32_t find_best_match(IntType probe_value) const {
334+
if (asof_values.empty()) {
335+
return 0;
336+
}
337+
if constexpr (IsGreater) {
338+
size_t pos = IsStrict ? lower_bound(probe_value) : upper_bound(probe_value);
339+
return pos > 0 ? row_indexes[pos - 1] : 0;
340+
} else {
341+
size_t pos = IsStrict ? upper_bound(probe_value) : lower_bound(probe_value);
342+
return pos < asof_values.size() ? row_indexes[pos] : 0;
343+
}
344+
}
345+
};
346+
347+
// Type-erased container for all ASOF index groups.
348+
// DateV2 -> uint32_t, DateTimeV2/TimestampTZ -> uint64_t.
349+
using AsofIndexVariant = std::variant<std::monostate, std::vector<AsofIndexGroup<uint32_t>>,
350+
std::vector<AsofIndexGroup<uint64_t>>>;
351+
222352
} // namespace doris

be/src/pipeline/dependency.h

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -631,6 +631,18 @@ struct HashJoinSharedState : public JoinSharedState {
631631
// local filter will always be applied, and in filter could guarantee precise filtering
632632
// ATTN: we should disable always_true logic for in filter when we set this flag
633633
bool left_semi_direct_return = false;
634+
635+
// ASOF JOIN specific fields
636+
// Whether the inequality is >= or > (true) vs <= or < (false)
637+
bool asof_inequality_is_greater = true;
638+
// Whether the inequality is strict (> or <) vs non-strict (>= or <=)
639+
bool asof_inequality_is_strict = false;
640+
641+
// ASOF JOIN pre-sorted index with inline values for O(log K) branchless lookup
642+
// Typed AsofIndexGroups stored in a variant (uint32_t for DateV2, uint64_t for DateTimeV2/TimestampTZ)
643+
AsofIndexVariant asof_index_groups;
644+
// build_row_index -> bucket_id for O(1) reverse lookup
645+
std::vector<uint32_t> asof_build_row_to_bucket;
634646
};
635647

636648
struct PartitionedHashJoinSharedState

0 commit comments

Comments
 (0)