diff --git a/src/Storages/StorageMerge.cpp b/src/Storages/StorageMerge.cpp index c6097b4388c7..49b5fb2ff86b 100644 --- a/src/Storages/StorageMerge.cpp +++ b/src/Storages/StorageMerge.cpp @@ -68,6 +68,7 @@ namespace DB namespace Setting { extern const SettingsBool allow_experimental_analyzer; + extern const SettingsBool distributed_aggregation_memory_efficient; extern const SettingsSeconds lock_acquire_timeout; extern const SettingsFloat max_streams_multiplier_for_merge_tables; extern const SettingsUInt64 merge_table_max_tables_to_look_for_schema_inference; @@ -550,16 +551,20 @@ void ReadFromMerge::initializePipeline(QueryPipelineBuilder & pipeline, const Bu pipeline = QueryPipelineBuilder::unitePipelines(std::move(pipelines)); - if (!query_info.input_order_info) + // It's possible to have many tables read from merge, resize(num_streams) might open too many files at the same time. + // Using narrowPipe instead. But in case of reading in order of primary key, we cannot do it, + // because narrowPipe doesn't preserve order. Also, if we are doing a memory efficient distributed agggregation, bucket + // order must be preserved. + const bool should_not_narrow = query_info.input_order_info || ( + context->getSettingsRef()[Setting::distributed_aggregation_memory_efficient] + && common_processed_stage == QueryProcessingStage::Enum::WithMergeableState); + if (!should_not_narrow) { size_t tables_count = selected_tables.size(); Float64 num_streams_multiplier = std::min( tables_count, std::max(1UL, static_cast(context->getSettingsRef()[Setting::max_streams_multiplier_for_merge_tables]))); size_t num_streams = static_cast(requested_num_streams * num_streams_multiplier); - // It's possible to have many tables read from merge, resize(num_streams) might open too many files at the same time. - // Using narrowPipe instead. But in case of reading in order of primary key, we cannot do it, - // because narrowPipe doesn't preserve order. pipeline.narrow(num_streams); } diff --git a/tests/config/config.d/clusters.xml b/tests/config/config.d/clusters.xml index 1a507a76fd85..e08b0fe17fa7 100644 --- a/tests/config/config.d/clusters.xml +++ b/tests/config/config.d/clusters.xml @@ -258,6 +258,188 @@ + + + + localhost + 9000 + + + + + localhost + 9000 + + + + + localhost + 9000 + + + + + localhost + 9000 + + + + + localhost + 9000 + + + + + localhost + 9000 + + + + + localhost + 9000 + + + + + localhost + 9000 + + + + + localhost + 9000 + + + + + localhost + 9000 + + + + + localhost + 9000 + + + + + localhost + 9000 + + + + + localhost + 9000 + + + + + localhost + 9000 + + + + + localhost + 9000 + + + + + localhost + 9000 + + + + + localhost + 9000 + + + + + localhost + 9000 + + + + + localhost + 9000 + + + + + localhost + 9000 + + + + + localhost + 9000 + + + + + localhost + 9000 + + + + + localhost + 9000 + + + + + localhost + 9000 + + + + + localhost + 9000 + + + + + localhost + 9000 + + + + + localhost + 9000 + + + + + localhost + 9000 + + + + + localhost + 9000 + + + + + localhost + 9000 + + + true diff --git a/tests/queries/0_stateless/03403_distributed_merge_two_level_aggregation.reference b/tests/queries/0_stateless/03403_distributed_merge_two_level_aggregation.reference new file mode 100644 index 000000000000..15ba0a45b720 --- /dev/null +++ b/tests/queries/0_stateless/03403_distributed_merge_two_level_aggregation.reference @@ -0,0 +1,20 @@ +(Expression) +ExpressionTransform × 4 + (MergingAggregated) + Resize 4 → 4 + MergingAggregatedBucketTransform × 4 + Resize 1 → 4 + GroupingAggregatedTransform 2 → 1 + (Union) + (MergingAggregated) + SortingAggregatedTransform 4 → 1 + MergingAggregatedBucketTransform × 4 + Resize 1 → 4 + GroupingAggregatedTransform 60 → 1 + (ReadFromMerge) + (MergingAggregated) + SortingAggregatedTransform 4 → 1 + MergingAggregatedBucketTransform × 4 + Resize 1 → 4 + GroupingAggregatedTransform 60 → 1 + (ReadFromMerge) diff --git a/tests/queries/0_stateless/03403_distributed_merge_two_level_aggregation.sql b/tests/queries/0_stateless/03403_distributed_merge_two_level_aggregation.sql new file mode 100644 index 000000000000..641d7aba793e --- /dev/null +++ b/tests/queries/0_stateless/03403_distributed_merge_two_level_aggregation.sql @@ -0,0 +1,42 @@ +-- Tags: no-fasttest, long + +DROP TABLE IF EXISTS test_table_1; +CREATE TABLE test_table_1(number UInt64) ENGINE = MergeTree ORDER BY number; +SYSTEM STOP MERGES test_table_1; + +DROP TABLE IF EXISTS dist_test_table_1; +CREATE TABLE dist_test_table_1(number UInt64) ENGINE = Distributed('test_cluster_thirty_shards_localhost', currentDatabase(), test_table_1, rand()); +INSERT INTO dist_test_table_1 SELECT number from numbers_mt(10000) SETTINGS distributed_foreground_insert = 1; + +DROP TABLE IF EXISTS test_table_2; +CREATE TABLE test_table_2(number UInt64) ENGINE = MergeTree ORDER BY number; +SYSTEM STOP MERGES test_table_2; + +DROP TABLE IF EXISTS dist_test_table_2; +CREATE TABLE dist_test_table_2(number UInt64) ENGINE = Distributed('test_cluster_thirty_shards_localhost', currentDatabase(), test_table_2, rand()); +INSERT INTO dist_test_table_2 SELECT number from numbers_mt(10000) SETTINGS distributed_foreground_insert = 1; + +DROP TABLE IF EXISTS merge_test_table; +CREATE TABLE merge_test_table ENGINE = Merge(currentDatabase(), '^dist_test_table_(1|2)$'); + +EXPLAIN PIPELINE +SELECT + cityHash64(number), + sum(1) +FROM remote('127.0.0.{1,1}', currentDatabase(), merge_test_table) +GROUP BY 1 +SETTINGS distributed_aggregation_memory_efficient = 1, max_threads = 4, optimize_aggregation_in_order = 0, prefer_localhost_replica = 1, async_socket_for_remote = 1, enable_analyzer = 0, enable_memory_bound_merging_of_aggregation_results = 0; + +SELECT + cityHash64(number), + sum(1) +FROM remote('127.0.0.{1,1}', currentDatabase(), merge_test_table) +GROUP BY 1 +FORMAT Null +SETTINGS distributed_aggregation_memory_efficient = 1, max_threads = 4, optimize_aggregation_in_order = 0, prefer_localhost_replica = 1, async_socket_for_remote = 1, enable_analyzer = 0, enable_memory_bound_merging_of_aggregation_results = 0, max_memory_usage='500Mi', group_by_two_level_threshold=1e6, group_by_two_level_threshold_bytes='500Mi'; + +DROP TABLE merge_test_table; +DROP TABLE dist_test_table_1; +DROP TABLE dist_test_table_2; +DROP TABLE test_table_1; +DROP TABLE test_table_2;