Skip to content
2 changes: 1 addition & 1 deletion src/Core/Settings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1930,7 +1930,7 @@ ClickHouse applies this setting when the query contains the product of object st
Possible values:

- `local` — Replaces the database and table in the subquery with local ones for the destination server (shard), leaving the normal `IN`/`JOIN.`
- `global` — Unsupported for now. Replaces the `IN`/`JOIN` query with `GLOBAL IN`/`GLOBAL JOIN.`
- `global` — Replaces the `IN`/`JOIN` query with `GLOBAL IN`/`GLOBAL JOIN.` Right table executes first and is added to the secondary query as temporay table.
- `allow` — Default value. Allows the use of these types of subqueries.
)", 0) \
\
Expand Down
82 changes: 63 additions & 19 deletions src/Storages/IStorageCluster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@
#include <Interpreters/InterpreterSelectQueryAnalyzer.h>
#include <Planner/Utils.h>
#include <Processors/Sources/RemoteSource.h>
#include <Processors/Executors/PullingPipelineExecutor.h>
#include <QueryPipeline/narrowPipe.h>
#include <QueryPipeline/Pipe.h>
#include <QueryPipeline/RemoteQueryExecutor.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
#include <Storages/IStorage.h>
#include <Storages/SelectQueryInfo.h>
#include <Storages/extractTableFunctionFromSelectQuery.h>
#include <Storages/buildQueryTreeForShard.h>
#include <Storages/StorageDistributed.h>
#include <TableFunctions/TableFunctionFactory.h>
#include <Planner/Utils.h>
Expand Down Expand Up @@ -115,11 +117,14 @@ class SearcherVisitor : public InDepthQueryTreeVisitorWithContext<SearcherVisito
using Base = InDepthQueryTreeVisitorWithContext<SearcherVisitor>;
using Base::Base;

explicit SearcherVisitor(std::unordered_set<QueryTreeNodeType> types_, ContextPtr context) : Base(context), types(types_) {}
explicit SearcherVisitor(std::unordered_set<QueryTreeNodeType> types_, size_t entry_, ContextPtr context)
: Base(context)
, types(types_)
, entry(entry_) {}

bool needChildVisit(QueryTreeNodePtr & /*parent*/, QueryTreeNodePtr & /*child*/)
{
return getSubqueryDepth() <= 2 && !passed_node;
return getSubqueryDepth() <= 2 && !passed_node && !current_entry;
}

void enterImpl(QueryTreeNodePtr & node)
Expand All @@ -130,13 +135,19 @@ class SearcherVisitor : public InDepthQueryTreeVisitorWithContext<SearcherVisito
auto node_type = node->getNodeType();

if (types.contains(node_type))
passed_node = node;
{
++current_entry;
if (current_entry == entry)
passed_node = node;
}
}

QueryTreeNodePtr getNode() const { return passed_node; }

private:
std::unordered_set<QueryTreeNodeType> types;
size_t entry;
size_t current_entry = 0;
QueryTreeNodePtr passed_node;
};

Expand Down Expand Up @@ -203,33 +214,42 @@ Converts
localtable as t
ON s3.key == t.key

to
to (object_storage_cluster_join_mode='local')

SELECT s3.c1, s3.c2, s3.key
FROM
s3Cluster(...) AS s3

or (object_storage_cluster_join_mode='global')

SELECT s3.c1, s3.c2, t.c3
FROM
s3Cluster(...) as s3
JOIN
values('key UInt32, data String', (1, 'one'), (2, 'two'), ...) as t
ON s3.key == t.key
*/
void IStorageCluster::updateQueryWithJoinToSendIfNeeded(
ASTPtr & query_to_send,
QueryTreeNodePtr query_tree,
SelectQueryInfo query_info,
const ContextPtr & context)
{
auto object_storage_cluster_join_mode = context->getSettingsRef()[Setting::object_storage_cluster_join_mode];
switch (object_storage_cluster_join_mode)
{
case ObjectStorageClusterJoinMode::LOCAL:
{
auto info = getQueryTreeInfo(query_tree, context);
auto info = getQueryTreeInfo(query_info.query_tree, context);

if (info.has_join || info.has_cross_join || info.has_local_columns_in_where)
{
auto modified_query_tree = query_tree->clone();
auto modified_query_tree = query_info.query_tree->clone();

SearcherVisitor left_table_expression_searcher({QueryTreeNodeType::TABLE, QueryTreeNodeType::TABLE_FUNCTION}, context);
SearcherVisitor left_table_expression_searcher({QueryTreeNodeType::TABLE, QueryTreeNodeType::TABLE_FUNCTION}, 1, context);
left_table_expression_searcher.visit(modified_query_tree);
auto table_function_node = left_table_expression_searcher.getNode();
if (!table_function_node)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't find table function node");
throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't find left table function node");

QueryTreeNodePtr query_tree_distributed;

Expand All @@ -242,7 +262,7 @@ void IStorageCluster::updateQueryWithJoinToSendIfNeeded(
}
else if (info.has_cross_join)
{
SearcherVisitor join_searcher({QueryTreeNodeType::CROSS_JOIN}, context);
SearcherVisitor join_searcher({QueryTreeNodeType::CROSS_JOIN}, 1, context);
join_searcher.visit(modified_query_tree);
auto cross_join_node = join_searcher.getNode();
if (!cross_join_node)
Expand Down Expand Up @@ -297,8 +317,24 @@ void IStorageCluster::updateQueryWithJoinToSendIfNeeded(
return;
}
case ObjectStorageClusterJoinMode::GLOBAL:
// TODO
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "`Global` mode for `object_storage_cluster_join_mode` setting is unimplemented for now");
{
auto info = getQueryTreeInfo(query_info.query_tree, context);

if (info.has_join || info.has_cross_join || info.has_local_columns_in_where)
{
auto modified_query_tree = query_info.query_tree->clone();

rewriteJoinToGlobalJoin(modified_query_tree, context);
modified_query_tree = buildQueryTreeForShard(
query_info.planner_context,
modified_query_tree,
/*allow_global_join_for_right_table*/ true,
/*find_cross_join*/ true);
query_to_send = queryNodeToDistributedSelectQuery(modified_query_tree);
}

return;
}
case ObjectStorageClusterJoinMode::ALLOW: // Do nothing special
return;
}
Expand Down Expand Up @@ -336,7 +372,7 @@ void IStorageCluster::read(
SharedHeader sample_block;
ASTPtr query_to_send = query_info.query;

updateQueryWithJoinToSendIfNeeded(query_to_send, query_info.query_tree, context);
updateQueryWithJoinToSendIfNeeded(query_to_send, query_info, context);

if (settings[Setting::allow_experimental_analyzer])
{
Expand Down Expand Up @@ -374,6 +410,10 @@ void IStorageCluster::read(

auto this_ptr = std::static_pointer_cast<IStorageCluster>(shared_from_this());

std::optional<Tables> external_tables = std::nullopt;
if (query_info.planner_context && query_info.planner_context->getMutableQueryContext())
external_tables = query_info.planner_context->getMutableQueryContext()->getExternalTables();

auto reading = std::make_unique<ReadFromCluster>(
column_names,
query_info,
Expand All @@ -384,7 +424,8 @@ void IStorageCluster::read(
std::move(query_to_send),
processed_stage,
cluster,
log);
log,
external_tables);

query_plan.addStep(std::move(reading));
}
Expand Down Expand Up @@ -502,7 +543,7 @@ void ReadFromCluster::initializePipeline(QueryPipelineBuilder & pipeline, const
new_context,
/*throttler=*/nullptr,
scalars,
Tables(),
external_tables.has_value() ? *external_tables : Tables(),
processed_stage,
nullptr,
RemoteQueryExecutor::Extension{.task_iterator = extension->task_iterator, .replica_info = std::move(replica_info)});
Expand Down Expand Up @@ -540,7 +581,7 @@ IStorageCluster::QueryTreeInfo IStorageCluster::getQueryTreeInfo(QueryTreeNodePt
info.has_cross_join = true;
}

SearcherVisitor left_table_expression_searcher({QueryTreeNodeType::TABLE, QueryTreeNodeType::TABLE_FUNCTION}, context);
SearcherVisitor left_table_expression_searcher({QueryTreeNodeType::TABLE, QueryTreeNodeType::TABLE_FUNCTION}, 1, context);
left_table_expression_searcher.visit(query_tree);
auto table_function_node = left_table_expression_searcher.getNode();
if (!table_function_node)
Expand Down Expand Up @@ -575,9 +616,12 @@ QueryProcessingStage::Enum IStorageCluster::getQueryProcessingStage(
throw Exception(ErrorCodes::NOT_IMPLEMENTED,
"object_storage_cluster_join_mode!='allow' is not supported without allow_experimental_analyzer=true");

auto info = getQueryTreeInfo(query_info.query_tree, context);
if (info.has_join || info.has_cross_join || info.has_local_columns_in_where)
return QueryProcessingStage::Enum::FetchColumns;
if (object_storage_cluster_join_mode == ObjectStorageClusterJoinMode::LOCAL)
{
auto info = getQueryTreeInfo(query_info.query_tree, context);
if (info.has_join || info.has_cross_join || info.has_local_columns_in_where)
return QueryProcessingStage::Enum::FetchColumns;
}
}

/// Initiator executes query on remote node.
Expand Down
7 changes: 5 additions & 2 deletions src/Storages/IStorageCluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ class IStorageCluster : public IStorage

protected:
virtual void updateQueryToSendIfNeeded(ASTPtr & /*query*/, const StorageSnapshotPtr & /*storage_snapshot*/, const ContextPtr & /*context*/) {}
void updateQueryWithJoinToSendIfNeeded(ASTPtr & query_to_send, QueryTreeNodePtr query_tree, const ContextPtr & context);
void updateQueryWithJoinToSendIfNeeded(ASTPtr & query_to_send, SelectQueryInfo query_info, const ContextPtr & context);

virtual void updateConfigurationIfNeeded(ContextPtr /* context */) {}

Expand Down Expand Up @@ -137,7 +137,8 @@ class ReadFromCluster : public SourceStepWithFilter
ASTPtr query_to_send_,
QueryProcessingStage::Enum processed_stage_,
ClusterPtr cluster_,
LoggerPtr log_)
LoggerPtr log_,
std::optional<Tables> external_tables_)
: SourceStepWithFilter(
std::move(sample_block),
column_names_,
Expand All @@ -149,6 +150,7 @@ class ReadFromCluster : public SourceStepWithFilter
, processed_stage(processed_stage_)
, cluster(std::move(cluster_))
, log(log_)
, external_tables(external_tables_)
{
}

Expand All @@ -160,6 +162,7 @@ class ReadFromCluster : public SourceStepWithFilter
LoggerPtr log;

std::optional<RemoteQueryExecutor::Extension> extension;
std::optional<Tables> external_tables;

void createExtension(const ActionsDAG::Node * predicate);
ContextPtr updateSettings(const Settings & settings);
Expand Down
43 changes: 37 additions & 6 deletions src/Storages/buildQueryTreeForShard.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ namespace Setting
extern const SettingsBool prefer_global_in_and_join;
extern const SettingsBool enable_add_distinct_to_in_subqueries;
extern const SettingsInt64 optimize_const_name_size;
extern const SettingsObjectStorageClusterJoinMode object_storage_cluster_join_mode;
}

namespace ErrorCodes
Expand Down Expand Up @@ -120,8 +121,9 @@ class DistributedProductModeRewriteInJoinVisitor : public InDepthQueryTreeVisito
using Base = InDepthQueryTreeVisitorWithContext<DistributedProductModeRewriteInJoinVisitor>;
using Base::Base;

explicit DistributedProductModeRewriteInJoinVisitor(const ContextPtr & context_)
explicit DistributedProductModeRewriteInJoinVisitor(const ContextPtr & context_, bool find_cross_join_)
: Base(context_)
, find_cross_join(find_cross_join_)
{}

struct InFunctionOrJoin
Expand Down Expand Up @@ -157,9 +159,11 @@ class DistributedProductModeRewriteInJoinVisitor : public InDepthQueryTreeVisito
{
auto * function_node = node->as<FunctionNode>();
auto * join_node = node->as<JoinNode>();
CrossJoinNode * cross_join_node = find_cross_join ? node->as<CrossJoinNode>() : nullptr;

if ((function_node && isNameOfGlobalInFunction(function_node->getFunctionName())) ||
(join_node && join_node->getLocality() == JoinLocality::Global))
(join_node && join_node->getLocality() == JoinLocality::Global) ||
cross_join_node)
{
InFunctionOrJoin in_function_or_join_entry;
in_function_or_join_entry.query_node = node;
Expand Down Expand Up @@ -223,7 +227,9 @@ class DistributedProductModeRewriteInJoinVisitor : public InDepthQueryTreeVisito
replacement_table_expression->setTableExpressionModifiers(*table_expression_modifiers);
replacement_map.emplace(table_node.get(), std::move(replacement_table_expression));
}
else if ((distributed_product_mode == DistributedProductMode::GLOBAL || getSettings()[Setting::prefer_global_in_and_join]) &&
else if ((distributed_product_mode == DistributedProductMode::GLOBAL ||
getSettings()[Setting::prefer_global_in_and_join] ||
(find_cross_join && getSettings()[Setting::object_storage_cluster_join_mode] == ObjectStorageClusterJoinMode::GLOBAL)) &&
!in_function_or_join_stack.empty())
{
auto * in_or_join_node_to_modify = in_function_or_join_stack.back().query_node.get();
Expand Down Expand Up @@ -257,6 +263,8 @@ class DistributedProductModeRewriteInJoinVisitor : public InDepthQueryTreeVisito
std::vector<InFunctionOrJoin> in_function_or_join_stack;
std::unordered_map<const IQueryTreeNode *, QueryTreeNodePtr> replacement_map;
std::vector<InFunctionOrJoin> global_in_or_join_nodes;

bool find_cross_join = false;
};

/** Replaces large constant values with `__getScalar` function calls to avoid
Expand Down Expand Up @@ -504,14 +512,18 @@ QueryTreeNodePtr getSubqueryFromTableExpression(

}

QueryTreeNodePtr buildQueryTreeForShard(const PlannerContextPtr & planner_context, QueryTreeNodePtr query_tree_to_modify, bool allow_global_join_for_right_table)
QueryTreeNodePtr buildQueryTreeForShard(
const PlannerContextPtr & planner_context,
QueryTreeNodePtr query_tree_to_modify,
bool allow_global_join_for_right_table,
bool find_cross_join)
{
CollectColumnSourceToColumnsVisitor collect_column_source_to_columns_visitor;
collect_column_source_to_columns_visitor.visit(query_tree_to_modify);

const auto & column_source_to_columns = collect_column_source_to_columns_visitor.getColumnSourceToColumns();

DistributedProductModeRewriteInJoinVisitor visitor(planner_context->getQueryContext());
DistributedProductModeRewriteInJoinVisitor visitor(planner_context->getQueryContext(), find_cross_join);
visitor.visit(query_tree_to_modify);

auto replacement_map = visitor.getReplacementMap();
Expand Down Expand Up @@ -550,6 +562,24 @@ QueryTreeNodePtr buildQueryTreeForShard(const PlannerContextPtr & planner_contex
replacement_map.emplace(join_table_expression.get(), std::move(temporary_table_expression_node));
continue;
}
if (auto * cross_join_node = global_in_or_join_node.query_node->as<CrossJoinNode>())
{
auto tables_count = cross_join_node->getTableExpressions().size();
for (size_t i = 1; i < tables_count; ++i)
{
QueryTreeNodePtr join_table_expression = cross_join_node->getTableExpressions()[i];

auto subquery_node = getSubqueryFromTableExpression(join_table_expression, column_source_to_columns, planner_context->getQueryContext());

auto temporary_table_expression_node = executeSubqueryNode(subquery_node,
planner_context->getMutableQueryContext(),
global_in_or_join_node.subquery_depth);
temporary_table_expression_node->setAlias(join_table_expression->getAlias());

replacement_map.emplace(join_table_expression.get(), std::move(temporary_table_expression_node));
}
continue;
}
if (auto * in_function_node = global_in_or_join_node.query_node->as<FunctionNode>())
{
auto & in_function_subquery_node = in_function_node->getArguments().getNodes().at(1);
Expand Down Expand Up @@ -661,7 +691,8 @@ class RewriteJoinToGlobalJoinVisitor : public InDepthQueryTreeVisitorWithContext
{
if (auto * join_node = node->as<JoinNode>())
{
bool prefer_local_join = getContext()->getSettingsRef()[Setting::parallel_replicas_prefer_local_join];
bool prefer_local_join = getContext()->getSettingsRef()[Setting::parallel_replicas_prefer_local_join]
&& getContext()->getSettingsRef()[Setting::object_storage_cluster_join_mode] != ObjectStorageClusterJoinMode::GLOBAL;
Comment on lines +694 to +695

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Keep local-join preference outside object-storage mode

rewriteJoinToGlobalJoin is also used by parallel-replica planning (src/Planner/findParallelReplicasQuery.cpp and src/Interpreters/ClusterProxy/executeQuery.cpp), so tying prefer_local_join to object_storage_cluster_join_mode here makes unrelated non-object-storage queries ignore parallel_replicas_prefer_local_join whenever the session sets object_storage_cluster_join_mode='global'. That forces extra GLOBAL JOIN rewrites in those flows and can significantly increase broadcast/memory costs; this override should be scoped to the object-storage cluster path instead of the shared visitor.

Useful? React with 👍 / 👎.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, object_storage_cluster_join_mode=global has priority and oveerides parallel_replicas_prefer_local_join setting here.

bool should_use_global_join = !prefer_local_join || !allStoragesAreMergeTree(join_node->getRightTableExpression());
if (should_use_global_join)
join_node->setLocality(JoinLocality::Global);
Expand Down
6 changes: 5 additions & 1 deletion src/Storages/buildQueryTreeForShard.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,11 @@ using PlannerContextPtr = std::shared_ptr<PlannerContext>;
class Context;
using ContextPtr = std::shared_ptr<const Context>;

QueryTreeNodePtr buildQueryTreeForShard(const PlannerContextPtr & planner_context, QueryTreeNodePtr query_tree_to_modify, bool allow_global_join_for_right_table);
QueryTreeNodePtr buildQueryTreeForShard(
const PlannerContextPtr & planner_context,
QueryTreeNodePtr query_tree_to_modify,
bool allow_global_join_for_right_table,
bool find_cross_join = false);

void rewriteJoinToGlobalJoin(QueryTreeNodePtr query_tree_to_modify, ContextPtr context);

Expand Down
Loading
Loading