Skip to content

Antalya 26.3: Cluster Joins part 2 - global mode#1782

Open
ianton-ru wants to merge 5 commits into
antalya-26.3from
frontport/antalya-26.3/json_part2
Open

Antalya 26.3: Cluster Joins part 2 - global mode#1782
ianton-ru wants to merge 5 commits into
antalya-26.3from
frontport/antalya-26.3/json_part2

Conversation

@ianton-ru
Copy link
Copy Markdown

@ianton-ru ianton-ru commented May 12, 2026

Changelog category (leave one):

  • New Feature

Changelog entry (a user-readable short description of the changes that goes to CHANGELOG.md):

Cluster Joins part 2 - global mode

Documentation entry for user-facing changes

Frontport of #1527

Setting object_storage_cluster_join_mode wiith value global.
In queries like

SELECT * FROM iceberg_table(...) JOIN local_table(...) ON ...

when left table is executed on cluster (s3Cluster, Iceberg with object_storage_cluster setting, etc.) data from right table is extracted and sent to swarm nodes as temorary tables. JOIN is executed on swarm nodes.

This PR also includes several fixes for issues, found by AI

These changes are in last three commits, and new for 26.3 port, do not exists in #1527 for 26.1.

CI/CD Options

Exclude tests:

  • Fast test
  • Integration Tests
  • Stateless tests
  • Stateful tests
  • Performance tests
  • All with ASAN
  • All with TSAN
  • All with MSAN
  • All with UBSAN
  • All with Coverage
  • All with Aarch64
  • All Regression
  • Disable CI Cache

Regression jobs to run:

  • Fast suites (mostly <1h)
  • Aggregate Functions (2h)
  • Alter (1.5h)
  • Benchmark (30m)
  • ClickHouse Keeper (1h)
  • Iceberg (2h)
  • LDAP (1h)
  • Parquet (1.5h)
  • RBAC (1.5h)
  • SSL Server (1h)
  • S3 (2h)
  • S3 Export (2h)
  • Swarms (30m)
  • Tiered Storage (2h)

@ianton-ru ianton-ru added antalya port-antalya PRs to be ported to all new Antalya releases antalya-26.3 labels May 12, 2026
@ianton-ru
Copy link
Copy Markdown
Author

@codex review

@github-actions
Copy link
Copy Markdown

github-actions Bot commented May 12, 2026

Workflow [PR], commit [a8dbd32]

Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 01d8b03ac1

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment thread src/Storages/IStorageCluster.cpp
Comment thread src/Storages/buildQueryTreeForShard.cpp
@svb-alt svb-alt added the forwardport This is a frontport of code that existed in previous Antalya versions label May 12, 2026
@svb-alt svb-alt requested a review from arthurpassos May 14, 2026 12:12
arthurpassos
arthurpassos previously approved these changes May 15, 2026
Copy link
Copy Markdown
Collaborator

@arthurpassos arthurpassos left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not familiar with this piece of code, I highly doubt I can add any value here. The code looks sane, and the tests as well.

@alsugiliazova
Copy link
Copy Markdown
Member

Audit: PR #1782 — Antalya 26.3: Cluster Joins part 2 - global mode

AI audit note: This review comment was generated by AI (Cursor agent, audit-review skill).

Scope reviewed

Head b63ad834 against base 918a26e6 on antalya-26.3. Feature scope: frontport of #1527 (cluster joins, GLOBAL mode for object_storage_cluster_join_mode) plus three 26.3-only follow-up fixes (Fix global IN, Fix cross join replacement, Fix rewriteInToGlobalIn). In-scope files: src/Core/Settings.cpp, src/Storages/IStorageCluster.{cpp,h}, src/Storages/buildQueryTreeForShard.{cpp,h}, src/Storages/StorageDistributed.cpp (deletion of RewriteInToGlobalInVisitor inline class, refactored into buildQueryTreeForShard.cpp), and integration tests test_s3_cluster/test.py, test_database_iceberg/test.py, test_storage_iceberg_with_spark/test_cluster_joins.py. Method: static analysis (call graph + transition matrix + logical fault injection on the new GLOBAL branch and the refactored helpers). No local build/run.

Confirmed defects

Medium: RewriteJoinToGlobalJoinVisitor leaks object_storage_cluster_join_mode=global into unrelated callers (parallel replicas + ClusterProxy)

  • Impact: Whenever a user has object_storage_cluster_join_mode='global' set in the session/profile, queries that go through parallel replicas or ClusterProxy::executeQueryWithCluster* paths now silently override parallel_replicas_prefer_local_join=true and force JOINs to JoinLocality::Global even when neither side is an IStorageCluster. Behavior of unrelated features changes based on an object-storage-only setting.
  • Anchor: src/Storages/buildQueryTreeForShard.cppRewriteJoinToGlobalJoinVisitor::enterImpl
  • Trigger: Session setting object_storage_cluster_join_mode='global' + any query that calls rewriteJoinToGlobalJoin from Planner/findParallelReplicasQuery.cpp:513 or Interpreters/ClusterProxy/executeQuery.cpp:832 (e.g., a normal MergeTree JOIN MergeTree under parallel replicas with parallel_replicas_prefer_local_join=1).
  • Why defect: The new check is prefer_local_join = ...prefer_local_join && ...object_storage_cluster_join_mode != GLOBAL. The visitor is shared by three callers but only one (the new IStorageCluster GLOBAL branch) should care about this setting. A session-wide flip of object_storage_cluster_join_mode thus mutates parallel-replicas/cluster-proxy JOIN locality decisions transparently.
  • Fix direction: Gate the override behind a visitor-constructor parameter (or pass it via the caller) instead of reading object_storage_cluster_join_mode from context inside the visitor.
  • Regression test direction: SELECT … FROM merge_tree A JOIN merge_tree B ON … under max_parallel_replicas=2, parallel_replicas_prefer_local_join=1, object_storage_cluster_join_mode='global'; assert plan keeps the JOIN local (not GLOBAL).

Medium: GLOBAL mode loses GLOBAL IN rewrite when the IN subquery only references StorageDistributed

  • Impact: For WHERE x IN (SELECT … FROM distributed_table) against an s3Cluster/iceberg cluster with object_storage_cluster_join_mode='global', the local IN is shipped verbatim to swarm nodes instead of being executed once on the initiator and broadcast as a temporary table. Each swarm node re-executes the subquery (potentially N× cost) and, if the swarm cluster does not know distributed_table's underlying cluster, the query fails on swarm nodes. User asked for GLOBAL semantics but got local-IN semantics.
  • Anchor: src/Storages/buildQueryTreeForShard.cppRewriteInToGlobalInVisitor::enterImpl (lines ~812–847); reached from IStorageCluster::updateQueryWithJoinToSendIfNeeded GLOBAL case (IStorageCluster.cpp ~324).
  • Trigger: object_storage_cluster_join_mode='global' + a query whose IN-subquery's join tree contains only StorageDistributed tables. Smallest case: SELECT … FROM iceberg_cluster(…) WHERE x IN (SELECT id FROM distributed_local) SETTINGS object_storage_cluster_join_mode='global'.
  • Why defect: The no_replace flag stays true when every table in extractTableExpressions(query->getJoinTree(), false, true) is a StorageDistributed, so the visitor early-returns without converting the function name. buildQueryTreeForShard then never picks the function up (it is still local in, not globalIn), so the temp-table materialization step is skipped. This logic was correct in its original home (StorageDistributed.cpp::buildQueryTreeDistributed, where peer shards are by construction reachable from each other) but is wrong when reused from an object-storage cluster initiator whose swarm nodes are a separate cluster from the distributed_local cluster.
  • Fix direction: When invoked from the IStorageCluster GLOBAL path, force-rewrite local IN to global IN unconditionally (skip the StorageDistributed-only early-return), e.g. by parameterizing the visitor with a force_global_in flag.
  • Regression test direction: Integration test that creates a Distributed table over a non-swarm cluster, runs SELECT … FROM iceberg(…) WHERE col IN (SELECT … FROM distributed_local) SETTINGS object_storage_cluster_join_mode='global'; assert system.query_log on swarm nodes shows the temp table read, not a distributed_local execution.

Low (latent): SearcherVisitor cannot find any match beyond the first when entry > 1

  • Impact: Dead today (all three call sites pass entry=1); future call with entry >= 2 will return nullptr and trip Can't find … LOGICAL_ERRORs in updateQueryWithJoinToSendIfNeeded/getQueryTreeInfo.
  • Anchor: src/Storages/IStorageCluster.cppSearcherVisitor::needChildVisit
  • Trigger: Future caller using entry >= 2 (parameter was added in this PR but never exercised with non-1 values).
  • Why defect: needChildVisit returns getSubqueryDepth() <= 2 && !passed_node && !current_entry. Once the first matching node increments current_entry to 1, recursion stops everywhere, so siblings and grandchildren are no longer visited and the second match is never reached.
  • Fix direction: Replace !current_entry with current_entry < entry (or drop the check entirely and rely on !passed_node).
  • Regression test direction: Add a unit/gtest that constructs a query with two TABLE_FUNCTION nodes and asserts SearcherVisitor({TABLE_FUNCTION}, 2, ctx) returns the second one.

Coverage summary

Item Detail
Scope reviewed IStorageCluster::{read, getQueryProcessingStage, updateQueryWithJoinToSendIfNeeded, getQueryTreeInfo}; SearcherVisitor/CollectUsedColumnsForSourceVisitor; new GLOBAL branch + external-tables propagation into ReadFromCluster; buildQueryTreeForShard extension for find_cross_join; DistributedProductModeRewriteInJoinVisitor extension; new rewriteInToGlobalIn/RewriteInToGlobalInVisitor move + signature fix (b63ad83); RewriteJoinToGlobalJoinVisitor setting interaction; the three new integration tests under test_s3_cluster/test_joins and test_storage_iceberg_with_spark/test_cluster_joins.
Categories failed Cross-setting interaction (object_storage_cluster_join_mode leaks into RewriteJoinToGlobalJoinVisitor for non-cluster callers); semantic mismatch of no_replace heuristic inherited from StorageDistributed.
Categories passed Memory lifetime (std::optional<Tables> external_tables is captured by value; temp-table StoragePtrs outlive the ReadFromCluster step via getMutableQueryContext); use-after-move (temporary_table_expression_node is loop-local in the new CROSS-JOIN branch — moved only on the final emplace, falls out of scope immediately after); RTTI (typeid_cast<CrossJoinNode> properly gated by find_cross_join); exception/rollback (no shared mutable state mutated before throw paths); getQueryProcessingStage asymmetric GLOBAL vs LOCAL routing matches intended swarm-side aggregation; buildQueryTreeForShard descendant-map walk correctly handles compound right-side expressions for both JOIN and the new CROSS_JOIN branch; rewriteInToGlobalIn QueryTreeNodePtr & fix in b63ad83 correctly propagates root-replacement back to caller (query_node.getWhere() reference).
Assumptions / limits Static analysis only. Did not verify whether planner_context->getMutableQueryContext() is identical to the context Context::createCopy()'d in the object_storage_remote_initiator path — temp tables added inside updateQueryWithJoinToSendIfNeeded may not be visible to the remote-initiator forwarding path (the new external_tables capture only feeds the local-cluster ReadFromCluster step, not the remote-initiator storage_and_context.storage->read(...) recursion at IStorageCluster.cpp:383). Flagged as an unknown, not a confirmed defect, because the planner context and read-context relationship in that branch was not exhaustively traced. Cross-join multi-table case (FROM A, B, C with the cluster table at index > 0) is by-design unsupported (consistent with LOCAL-mode behavior) and not classified as a defect.

@alsugiliazova alsugiliazova added the verified-with-issues Verified by QA and issues found. label May 15, 2026
@ianton-ru
Copy link
Copy Markdown
Author

Medium: RewriteJoinToGlobalJoinVisitor leaks object_storage_cluster_join_mode=global into unrelated callers

Expected. object_storage_cluster_join_mode has priority over parallel_replicas_prefer_local_join

Medium: GLOBAL mode loses GLOBAL IN rewrite when the IN subquery only references StorageDistributed

Fixed in a8dbd32

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

antalya antalya-26.3 forwardport This is a frontport of code that existed in previous Antalya versions port-antalya PRs to be ported to all new Antalya releases verified-with-issues Verified by QA and issues found.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants