diff --git a/src/Formats/FormatFactory.cpp b/src/Formats/FormatFactory.cpp index 807bf03ce4f7..42e9ac18ecfa 100644 --- a/src/Formats/FormatFactory.cpp +++ b/src/Formats/FormatFactory.cpp @@ -831,6 +831,14 @@ BucketSplitter FormatFactory::getSplitter(const String & format) return creator.bucket_splitter_creator(); } +bool FormatFactory::checkFormatHasSplitter(const String & format) const +{ + auto it = dict.find(boost::to_lower_copy(format)); + if (it == dict.end()) + return false; + return static_cast(it->second.bucket_splitter_creator); +} + void FormatFactory::registerRandomAccessInputFormat(const String & name, RandomAccessInputCreator input_creator) { chassert(input_creator); diff --git a/src/Formats/FormatFactory.h b/src/Formats/FormatFactory.h index 76a04bb98798..2aac56dd2d87 100644 --- a/src/Formats/FormatFactory.h +++ b/src/Formats/FormatFactory.h @@ -373,6 +373,9 @@ class FormatFactory final : private boost::noncopyable, public IHints<2> void registerFileBucketInfo(const String & format, FileBucketInfoCreator bucket_info); void registerSplitter(const String & format, BucketSplitterCreator splitter); BucketSplitter getSplitter(const String & format); + /// Returns true if `format` is registered and has a bucket splitter + /// (e.g. Parquet). Used to decide whether to attempt single-file parallel splitting. + bool checkFormatHasSplitter(const String & format) const; private: FormatsDictionary dict; diff --git a/src/Processors/Formats/IInputFormat.h b/src/Processors/Formats/IInputFormat.h index ab693ba7a212..7030ff3849eb 100644 --- a/src/Processors/Formats/IInputFormat.h +++ b/src/Processors/Formats/IInputFormat.h @@ -66,6 +66,11 @@ struct IBucketSplitter /// Returns information about the resulting buckets (see the structure above for details). virtual std::vector splitToBuckets(size_t bucket_size, ReadBuffer & buf, const FormatSettings & format_settings_) = 0; + /// Splits a file into approximately `target_count` buckets, each covering a roughly + /// equal slice of the file. Useful for parallelising one large file across N readers. + /// The result has at most `target_count` buckets and never drops any data. + virtual std::vector splitToBucketsByCount(size_t target_count, ReadBuffer & buf, const FormatSettings & format_settings_) = 0; + virtual ~IBucketSplitter() = default; }; using BucketSplitter = std::shared_ptr; diff --git a/src/Processors/Formats/Impl/Parquet/ReadManager.cpp b/src/Processors/Formats/Impl/Parquet/ReadManager.cpp index 72aeabd331ab..fa1459732657 100644 --- a/src/Processors/Formats/Impl/Parquet/ReadManager.cpp +++ b/src/Processors/Formats/Impl/Parquet/ReadManager.cpp @@ -64,8 +64,26 @@ void ReadManager::init(FormatParserSharedResourcesPtr parser_shared_resources_, reader.prefilterAndInitRowGroups(row_groups_to_read); reader.preparePrewhere(); - ProfileEvents::increment(ProfileEvents::ParquetReadRowGroups, reader.row_groups.size()); - ProfileEvents::increment(ProfileEvents::ParquetPrunedRowGroups, reader.file_metadata.row_groups.size() - reader.row_groups.size()); + /// Profile events must reflect only the row groups that belong to this bucket, otherwise + /// every bucket of a single-file split would report the file's totals and the events would + /// be multiplied by the number of buckets. + size_t read_count; + size_t total_in_partition; + if (row_groups_to_read.has_value()) + { + read_count = 0; + for (const auto & rg : reader.row_groups) + if (rg.need_to_process) + ++read_count; + total_in_partition = row_groups_to_read->size(); + } + else + { + read_count = reader.row_groups.size(); + total_in_partition = reader.file_metadata.row_groups.size(); + } + ProfileEvents::increment(ProfileEvents::ParquetReadRowGroups, read_count); + ProfileEvents::increment(ProfileEvents::ParquetPrunedRowGroups, total_in_partition - read_count); size_t num_row_groups = reader.row_groups.size(); for (size_t i = size_t(ReadStage::NotStarted) + 1; i < size_t(ReadStage::Deliver); ++i) diff --git a/src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp b/src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp index 2a88c7f000c9..7163a9eb727b 100644 --- a/src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp +++ b/src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp @@ -1402,6 +1402,35 @@ std::vector ParquetBucketSplitter::splitToBuckets(size_t buck return result; } +std::vector ParquetBucketSplitter::splitToBucketsByCount(size_t target_count, ReadBuffer & buf, const FormatSettings & format_settings_) +{ + std::atomic is_stopped = false; + auto arrow_file = asArrowFile(buf, format_settings_, is_stopped, "Parquet", PARQUET_MAGIC_BYTES, /* avoid_buffering */ true, nullptr); + auto metadata = parquet::ReadMetaData(arrow_file); + const size_t num_row_groups = metadata->num_row_groups(); + + if (target_count == 0 || num_row_groups == 0) + return {}; + + /// Distribute row groups across at most target_count contiguous chunks. Each + /// chunk becomes a single ParquetFileBucketInfo containing several row groups, + /// so the caller gets one source per chunk and no row group is dropped. + const size_t num_chunks = std::min(target_count, num_row_groups); + std::vector result; + result.reserve(num_chunks); + for (size_t g = 0; g < num_chunks; ++g) + { + size_t lo = g * num_row_groups / num_chunks; + size_t hi = (g + 1) * num_row_groups / num_chunks; + std::vector ids; + ids.reserve(hi - lo); + for (size_t k = lo; k < hi; ++k) + ids.push_back(k); + result.push_back(std::make_shared(ids)); + } + return result; +} + void registerInputFormatParquet(FormatFactory & factory) { factory.registerFileBucketInfo( diff --git a/src/Processors/Formats/Impl/ParquetBlockInputFormat.h b/src/Processors/Formats/Impl/ParquetBlockInputFormat.h index 1fea5f0609c1..7dc0e68ecb08 100644 --- a/src/Processors/Formats/Impl/ParquetBlockInputFormat.h +++ b/src/Processors/Formats/Impl/ParquetBlockInputFormat.h @@ -71,6 +71,7 @@ struct ParquetBucketSplitter : public IBucketSplitter { ParquetBucketSplitter() = default; std::vector splitToBuckets(size_t bucket_size, ReadBuffer & buf, const FormatSettings & format_settings_) override; + std::vector splitToBucketsByCount(size_t target_count, ReadBuffer & buf, const FormatSettings & format_settings_) override; }; class ParquetBlockInputFormat : public IInputFormat diff --git a/src/Processors/Formats/Impl/ParquetV3BlockInputFormat.cpp b/src/Processors/Formats/Impl/ParquetV3BlockInputFormat.cpp index c9498e6dd8c0..01a73159d04a 100644 --- a/src/Processors/Formats/Impl/ParquetV3BlockInputFormat.cpp +++ b/src/Processors/Formats/Impl/ParquetV3BlockInputFormat.cpp @@ -126,8 +126,23 @@ Chunk ParquetV3BlockInputFormat::read() temp_prefetcher.init(in, read_options, parser_shared_resources); parquet::format::FileMetaData file_metadata = getFileMetadata(temp_prefetcher); + size_t num_rows = 0; + if (buckets_to_read) + { + /// Only count rows in the assigned row groups. Otherwise multiple sources + /// reading buckets of the same file would each report the file's total. + for (size_t rg : buckets_to_read->row_group_ids) + { + if (rg < file_metadata.row_groups.size()) + num_rows += size_t(file_metadata.row_groups[rg].num_rows); + } + } + else + { + num_rows = size_t(file_metadata.num_rows); + } - auto chunk = getChunkForCount(size_t(file_metadata.num_rows)); + auto chunk = getChunkForCount(num_rows); chunk.getChunkInfos().add(std::make_shared(0)); reported_count = true; diff --git a/src/Storages/StorageFile.cpp b/src/Storages/StorageFile.cpp index f5e9b285e113..ffd61755081e 100644 --- a/src/Storages/StorageFile.cpp +++ b/src/Storages/StorageFile.cpp @@ -1546,6 +1546,15 @@ Chunk StorageFileSource::generate() progress_callback(FileProgress(0, tryGetFileSizeFromReadBuffer(*read_buf).value_or(0))); } } + else if (fixed_file_path.has_value()) + { + /// This source was assigned to one specific (file, bucket) pair. + /// Consume it exactly once. + if (fixed_file_consumed) + return {}; + fixed_file_consumed = true; + current_path = *fixed_file_path; + } else { current_path = files_iterator->next(); @@ -1572,7 +1581,11 @@ Chunk StorageFileSource::generate() if (getContext()->getSettingsRef()[Setting::engine_file_skip_empty_files] && file_stat.st_size == 0) continue; - if (need_only_count && tryGetCountFromCache(file_stat)) + /// The count cache stores the file's total row count. When this source + /// only reads a subset of the file (file_bucket_info is set), the cache + /// is inapplicable — using it would have every source report the full + /// total and produce a count that's multiplied by the number of buckets. + if (need_only_count && !file_bucket_info && tryGetCountFromCache(file_stat)) continue; read_buf = createReadBuffer(current_path, file_stat, storage->use_table_fd, storage->table_fd, storage->compression_method, getContext()); @@ -1601,6 +1614,12 @@ Chunk StorageFileSource::generate() input_format->setSerializationHints(serialization_hints); + /// If this source was assigned to read only a subset of the file's buckets + /// (used to read one large file with multiple parallel sources), pass the + /// bucket assignment to the format before it starts reading. + if (file_bucket_info) + input_format->setBucketsToRead(file_bucket_info); + if (need_only_count) input_format->needOnlyCount(); @@ -1666,7 +1685,8 @@ Chunk StorageFileSource::generate() finished_generate = true; if (input_format && storage->format_name != "Distributed" && getContext()->getSettingsRef()[Setting::use_cache_for_count_from_files] - && (!format_filter_info || !format_filter_info->hasFilter())) + && (!format_filter_info || !format_filter_info->hasFilter()) + && !file_bucket_info) addNumRowsToCache(current_path, total_rows_in_file); total_rows_in_file = 0; @@ -1869,11 +1889,55 @@ void ReadFromFile::initializePipeline(QueryPipelineBuilder & pipeline, const Bui if (max_num_streams > files_to_read) num_streams = files_to_read; + auto ctx = getContext(); + + /// If we are reading exactly one local file in a splittable format (e.g. Parquet), + /// we can split it into multiple buckets (row group ranges) and create one source + /// per bucket. This recovers the parallelism we'd otherwise have only when reading + /// many files at once. Without this, a single big Parquet file feeds the whole + /// downstream pipeline through a single source/Resize(1->N) — leaving most of the + /// CPU idle on machines with many cores. + /// + /// We use the file list from `files_iterator` rather than `storage->paths`: the + /// iterator has already pruned files by `_path`/`_file` virtual-column predicates + /// (`createPathAndFileFilterDAG`), so the optimization respects that pruning. If + /// the predicate excludes the only path the file is not read at all. It also + /// means a query against many paths whose predicate prunes down to a single file + /// still benefits from the split. + std::vector per_source_buckets; + String single_file_path; + if (max_num_streams > 1 + && !storage->archive_info + && !storage->use_table_fd + && !storage->has_peekable_read_buffer_from_fd.load() + && !storage->distributed_processing + && storage->compression_method == "auto" + && FormatFactory::instance().checkFormatHasSplitter(storage->format_name) + && FormatFactory::instance().checkParallelizeOutputAfterReading(storage->format_name, ctx) + && files_iterator->getFiles().size() == 1) + { + auto splitter = FormatFactory::instance().getSplitter(storage->format_name); + single_file_path = files_iterator->getFiles().front(); + struct stat file_stat = getFileStat(single_file_path, false, -1, storage->getName()); + if (file_stat.st_size > 0) + { + auto buf = createReadBuffer( + single_file_path, file_stat, false, -1, storage->compression_method, ctx); + auto buckets = splitter->splitToBucketsByCount( + max_num_streams, *buf, + storage->format_settings.value_or(getFormatSettings(ctx))); + + if (buckets.size() >= 2) + { + per_source_buckets = std::move(buckets); + num_streams = per_source_buckets.size(); + } + } + } + Pipes pipes; pipes.reserve(num_streams); - auto ctx = getContext(); - /// Set total number of bytes to process. For progress bar. auto progress_callback = ctx->getFileProgressCallback(); @@ -1904,13 +1968,19 @@ void ReadFromFile::initializePipeline(QueryPipelineBuilder & pipeline, const Bui parser_shared_resources, format_filter_info); + if (i < per_source_buckets.size()) + { + source->fixed_file_path = single_file_path; + source->file_bucket_info = per_source_buckets[i]; + } + pipes.emplace_back(std::move(source)); } auto pipe = Pipe::unitePipes(std::move(pipes)); size_t output_ports = pipe.numOutputPorts(); const bool parallelize_output = ctx->getSettingsRef()[Setting::parallelize_output_from_storages]; - if (parallelize_output && storage->parallelizeOutputAfterReading(ctx) && output_ports > 0 && output_ports < max_num_streams) + if (parallelize_output && storage->parallelizeOutputAfterReading(ctx) && output_ports > 0 && output_ports != max_num_streams) pipe.resize(max_num_streams); if (pipe.empty()) diff --git a/src/Storages/StorageFile.h b/src/Storages/StorageFile.h index 82c3a2a7cd49..3e1815025806 100644 --- a/src/Storages/StorageFile.h +++ b/src/Storages/StorageFile.h @@ -25,6 +25,9 @@ using OutputFormatPtr = std::shared_ptr; class IInputFormat; using InputFormatPtr = std::shared_ptr; +struct FileBucketInfo; +using FileBucketInfoPtr = std::shared_ptr; + class PullingPipelineExecutor; class StorageFile final : public IStorage @@ -248,6 +251,11 @@ class StorageFileSource : public ISource, WithContext } const String & getFileNameInArchive(); + + /// Returns the (possibly virtual-column-filtered) list of files this iterator + /// will produce. Only meaningful when not reading from an archive and not + /// using distributed_processing. + const std::vector & getFiles() const { return files; } private: std::vector files; @@ -312,6 +320,17 @@ class StorageFileSource : public ISource, WithContext std::shared_ptr archive_reader; std::unique_ptr file_enumerator; + /// Optional subset-of-file assignment. When set, the input format only reads + /// these buckets (e.g. for Parquet — only the listed row groups). This is how + /// a single big file is processed in parallel by multiple sources. + FileBucketInfoPtr file_bucket_info; + + /// When this source has been assigned a specific (file, bucket) pair, it + /// reads only that one file (once) and ignores the shared FilesIterator. + /// Set together with `file_bucket_info`. + std::optional fixed_file_path; + bool fixed_file_consumed = false; + ColumnsDescription columns_description; NamesAndTypesList requested_columns; NamesAndTypesList requested_virtual_columns; diff --git a/tests/queries/0_stateless/02725_parquet_preserve_order.reference b/tests/queries/0_stateless/02725_parquet_preserve_order.reference index 3f410c13ec44..ce91c5aed9ea 100644 --- a/tests/queries/0_stateless/02725_parquet_preserve_order.reference +++ b/tests/queries/0_stateless/02725_parquet_preserve_order.reference @@ -8,5 +8,4 @@ ExpressionTransform (Expression) ExpressionTransform × 2 (ReadFromFile) - Resize 1 → 2 - File 0 → 1 + File × 2 0 → 1