Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions src/Storages/StorageMerge.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ namespace DB
namespace Setting
{
extern const SettingsBool allow_experimental_analyzer;
extern const SettingsBool distributed_aggregation_memory_efficient;
extern const SettingsSeconds lock_acquire_timeout;
extern const SettingsFloat max_streams_multiplier_for_merge_tables;
extern const SettingsUInt64 merge_table_max_tables_to_look_for_schema_inference;
Expand Down Expand Up @@ -550,16 +551,20 @@ void ReadFromMerge::initializePipeline(QueryPipelineBuilder & pipeline, const Bu

pipeline = QueryPipelineBuilder::unitePipelines(std::move(pipelines));

if (!query_info.input_order_info)
// It's possible to have many tables read from merge, resize(num_streams) might open too many files at the same time.
// Using narrowPipe instead. But in case of reading in order of primary key, we cannot do it,
// because narrowPipe doesn't preserve order. Also, if we are doing a memory efficient distributed agggregation, bucket
// order must be preserved.
const bool should_not_narrow = query_info.input_order_info || (
context->getSettingsRef()[Setting::distributed_aggregation_memory_efficient]
&& common_processed_stage == QueryProcessingStage::Enum::WithMergeableState);
if (!should_not_narrow)
{
size_t tables_count = selected_tables.size();
Float64 num_streams_multiplier = std::min(
tables_count, std::max(1UL, static_cast<size_t>(context->getSettingsRef()[Setting::max_streams_multiplier_for_merge_tables])));
size_t num_streams = static_cast<size_t>(requested_num_streams * num_streams_multiplier);

// It's possible to have many tables read from merge, resize(num_streams) might open too many files at the same time.
// Using narrowPipe instead. But in case of reading in order of primary key, we cannot do it,
// because narrowPipe doesn't preserve order.
pipeline.narrow(num_streams);
}

Expand Down
182 changes: 182 additions & 0 deletions tests/config/config.d/clusters.xml
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,188 @@
</replica>
</shard>
</test_cluster_two_shards_localhost>
<test_cluster_thirty_shards_localhost>
<shard>
<replica>
<host>localhost</host>
<port>9000</port>
</replica>
</shard>
<shard>
<replica>
<host>localhost</host>
<port>9000</port>
</replica>
</shard>
<shard>
<replica>
<host>localhost</host>
<port>9000</port>
</replica>
</shard>
<shard>
<replica>
<host>localhost</host>
<port>9000</port>
</replica>
</shard>
<shard>
<replica>
<host>localhost</host>
<port>9000</port>
</replica>
</shard>
<shard>
<replica>
<host>localhost</host>
<port>9000</port>
</replica>
</shard>
<shard>
<replica>
<host>localhost</host>
<port>9000</port>
</replica>
</shard>
<shard>
<replica>
<host>localhost</host>
<port>9000</port>
</replica>
</shard>
<shard>
<replica>
<host>localhost</host>
<port>9000</port>
</replica>
</shard>
<shard>
<replica>
<host>localhost</host>
<port>9000</port>
</replica>
</shard>
<shard>
<replica>
<host>localhost</host>
<port>9000</port>
</replica>
</shard>
<shard>
<replica>
<host>localhost</host>
<port>9000</port>
</replica>
</shard>
<shard>
<replica>
<host>localhost</host>
<port>9000</port>
</replica>
</shard>
<shard>
<replica>
<host>localhost</host>
<port>9000</port>
</replica>
</shard>
<shard>
<replica>
<host>localhost</host>
<port>9000</port>
</replica>
</shard>
<shard>
<replica>
<host>localhost</host>
<port>9000</port>
</replica>
</shard>
<shard>
<replica>
<host>localhost</host>
<port>9000</port>
</replica>
</shard>
<shard>
<replica>
<host>localhost</host>
<port>9000</port>
</replica>
</shard>
<shard>
<replica>
<host>localhost</host>
<port>9000</port>
</replica>
</shard>
<shard>
<replica>
<host>localhost</host>
<port>9000</port>
</replica>
</shard>
<shard>
<replica>
<host>localhost</host>
<port>9000</port>
</replica>
</shard>
<shard>
<replica>
<host>localhost</host>
<port>9000</port>
</replica>
</shard>
<shard>
<replica>
<host>localhost</host>
<port>9000</port>
</replica>
</shard>
<shard>
<replica>
<host>localhost</host>
<port>9000</port>
</replica>
</shard>
<shard>
<replica>
<host>localhost</host>
<port>9000</port>
</replica>
</shard>
<shard>
<replica>
<host>localhost</host>
<port>9000</port>
</replica>
</shard>
<shard>
<replica>
<host>localhost</host>
<port>9000</port>
</replica>
</shard>
<shard>
<replica>
<host>localhost</host>
<port>9000</port>
</replica>
</shard>
<shard>
<replica>
<host>localhost</host>
<port>9000</port>
</replica>
</shard>
<shard>
<replica>
<host>localhost</host>
<port>9000</port>
</replica>
</shard>
</test_cluster_thirty_shards_localhost>
<test_cluster_two_shards_internal_replication>
<shard>
<internal_replication>true</internal_replication>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
(Expression)
ExpressionTransform × 4
(MergingAggregated)
Resize 4 → 4
MergingAggregatedBucketTransform × 4
Resize 1 → 4
GroupingAggregatedTransform 2 → 1
(Union)
(MergingAggregated)
SortingAggregatedTransform 4 → 1
MergingAggregatedBucketTransform × 4
Resize 1 → 4
GroupingAggregatedTransform 60 → 1
(ReadFromMerge)
(MergingAggregated)
SortingAggregatedTransform 4 → 1
MergingAggregatedBucketTransform × 4
Resize 1 → 4
GroupingAggregatedTransform 60 → 1
(ReadFromMerge)
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
-- Tags: no-fasttest, long

DROP TABLE IF EXISTS test_table_1;
CREATE TABLE test_table_1(number UInt64) ENGINE = MergeTree ORDER BY number;
SYSTEM STOP MERGES test_table_1;

DROP TABLE IF EXISTS dist_test_table_1;
CREATE TABLE dist_test_table_1(number UInt64) ENGINE = Distributed('test_cluster_thirty_shards_localhost', currentDatabase(), test_table_1, rand());
INSERT INTO dist_test_table_1 SELECT number from numbers_mt(10000) SETTINGS distributed_foreground_insert = 1;

DROP TABLE IF EXISTS test_table_2;
CREATE TABLE test_table_2(number UInt64) ENGINE = MergeTree ORDER BY number;
SYSTEM STOP MERGES test_table_2;

DROP TABLE IF EXISTS dist_test_table_2;
CREATE TABLE dist_test_table_2(number UInt64) ENGINE = Distributed('test_cluster_thirty_shards_localhost', currentDatabase(), test_table_2, rand());
INSERT INTO dist_test_table_2 SELECT number from numbers_mt(10000) SETTINGS distributed_foreground_insert = 1;

DROP TABLE IF EXISTS merge_test_table;
CREATE TABLE merge_test_table ENGINE = Merge(currentDatabase(), '^dist_test_table_(1|2)$');

EXPLAIN PIPELINE
SELECT
cityHash64(number),
sum(1)
FROM remote('127.0.0.{1,1}', currentDatabase(), merge_test_table)
GROUP BY 1
SETTINGS distributed_aggregation_memory_efficient = 1, max_threads = 4, optimize_aggregation_in_order = 0, prefer_localhost_replica = 1, async_socket_for_remote = 1, enable_analyzer = 0, enable_memory_bound_merging_of_aggregation_results = 0;

SELECT
cityHash64(number),
sum(1)
FROM remote('127.0.0.{1,1}', currentDatabase(), merge_test_table)
GROUP BY 1
FORMAT Null
SETTINGS distributed_aggregation_memory_efficient = 1, max_threads = 4, optimize_aggregation_in_order = 0, prefer_localhost_replica = 1, async_socket_for_remote = 1, enable_analyzer = 0, enable_memory_bound_merging_of_aggregation_results = 0, max_memory_usage='500Mi', group_by_two_level_threshold=1e6, group_by_two_level_threshold_bytes='500Mi';

DROP TABLE merge_test_table;
DROP TABLE dist_test_table_1;
DROP TABLE dist_test_table_2;
DROP TABLE test_table_1;
DROP TABLE test_table_2;
Loading