Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions src/Formats/FormatFactory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -831,6 +831,14 @@ BucketSplitter FormatFactory::getSplitter(const String & format)
return creator.bucket_splitter_creator();
}

bool FormatFactory::checkFormatHasSplitter(const String & format) const
{
auto it = dict.find(boost::to_lower_copy(format));
if (it == dict.end())
return false;
return static_cast<bool>(it->second.bucket_splitter_creator);
}

void FormatFactory::registerRandomAccessInputFormat(const String & name, RandomAccessInputCreator input_creator)
{
chassert(input_creator);
Expand Down
3 changes: 3 additions & 0 deletions src/Formats/FormatFactory.h
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,9 @@ class FormatFactory final : private boost::noncopyable, public IHints<2>
void registerFileBucketInfo(const String & format, FileBucketInfoCreator bucket_info);
void registerSplitter(const String & format, BucketSplitterCreator splitter);
BucketSplitter getSplitter(const String & format);
/// Returns true if `format` is registered and has a bucket splitter
/// (e.g. Parquet). Used to decide whether to attempt single-file parallel splitting.
bool checkFormatHasSplitter(const String & format) const;

private:
FormatsDictionary dict;
Expand Down
5 changes: 5 additions & 0 deletions src/Processors/Formats/IInputFormat.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,11 @@ struct IBucketSplitter
/// Returns information about the resulting buckets (see the structure above for details).
virtual std::vector<FileBucketInfoPtr> splitToBuckets(size_t bucket_size, ReadBuffer & buf, const FormatSettings & format_settings_) = 0;

/// Splits a file into approximately `target_count` buckets, each covering a roughly
/// equal slice of the file. Useful for parallelising one large file across N readers.
/// The result has at most `target_count` buckets and never drops any data.
virtual std::vector<FileBucketInfoPtr> splitToBucketsByCount(size_t target_count, ReadBuffer & buf, const FormatSettings & format_settings_) = 0;

virtual ~IBucketSplitter() = default;
};
using BucketSplitter = std::shared_ptr<IBucketSplitter>;
Expand Down
22 changes: 20 additions & 2 deletions src/Processors/Formats/Impl/Parquet/ReadManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,26 @@ void ReadManager::init(FormatParserSharedResourcesPtr parser_shared_resources_,
reader.prefilterAndInitRowGroups(row_groups_to_read);
reader.preparePrewhere();

ProfileEvents::increment(ProfileEvents::ParquetReadRowGroups, reader.row_groups.size());
ProfileEvents::increment(ProfileEvents::ParquetPrunedRowGroups, reader.file_metadata.row_groups.size() - reader.row_groups.size());
/// Profile events must reflect only the row groups that belong to this bucket, otherwise
/// every bucket of a single-file split would report the file's totals and the events would
/// be multiplied by the number of buckets.
size_t read_count;
size_t total_in_partition;
if (row_groups_to_read.has_value())
{
read_count = 0;
for (const auto & rg : reader.row_groups)
if (rg.need_to_process)
++read_count;
total_in_partition = row_groups_to_read->size();
}
else
{
read_count = reader.row_groups.size();
total_in_partition = reader.file_metadata.row_groups.size();
}
ProfileEvents::increment(ProfileEvents::ParquetReadRowGroups, read_count);
ProfileEvents::increment(ProfileEvents::ParquetPrunedRowGroups, total_in_partition - read_count);

size_t num_row_groups = reader.row_groups.size();
for (size_t i = size_t(ReadStage::NotStarted) + 1; i < size_t(ReadStage::Deliver); ++i)
Expand Down
29 changes: 29 additions & 0 deletions src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1402,6 +1402,35 @@ std::vector<FileBucketInfoPtr> ParquetBucketSplitter::splitToBuckets(size_t buck
return result;
}

std::vector<FileBucketInfoPtr> ParquetBucketSplitter::splitToBucketsByCount(size_t target_count, ReadBuffer & buf, const FormatSettings & format_settings_)
{
std::atomic<int> is_stopped = false;
auto arrow_file = asArrowFile(buf, format_settings_, is_stopped, "Parquet", PARQUET_MAGIC_BYTES, /* avoid_buffering */ true, nullptr);
auto metadata = parquet::ReadMetaData(arrow_file);
const size_t num_row_groups = metadata->num_row_groups();

if (target_count == 0 || num_row_groups == 0)
return {};

/// Distribute row groups across at most target_count contiguous chunks. Each
/// chunk becomes a single ParquetFileBucketInfo containing several row groups,
/// so the caller gets one source per chunk and no row group is dropped.
const size_t num_chunks = std::min(target_count, num_row_groups);
std::vector<FileBucketInfoPtr> result;
result.reserve(num_chunks);
for (size_t g = 0; g < num_chunks; ++g)
{
size_t lo = g * num_row_groups / num_chunks;
size_t hi = (g + 1) * num_row_groups / num_chunks;
std::vector<size_t> ids;
ids.reserve(hi - lo);
for (size_t k = lo; k < hi; ++k)
ids.push_back(k);
result.push_back(std::make_shared<ParquetFileBucketInfo>(ids));
}
return result;
}

void registerInputFormatParquet(FormatFactory & factory)
{
factory.registerFileBucketInfo(
Expand Down
1 change: 1 addition & 0 deletions src/Processors/Formats/Impl/ParquetBlockInputFormat.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ struct ParquetBucketSplitter : public IBucketSplitter
{
ParquetBucketSplitter() = default;
std::vector<FileBucketInfoPtr> splitToBuckets(size_t bucket_size, ReadBuffer & buf, const FormatSettings & format_settings_) override;
std::vector<FileBucketInfoPtr> splitToBucketsByCount(size_t target_count, ReadBuffer & buf, const FormatSettings & format_settings_) override;
};

class ParquetBlockInputFormat : public IInputFormat
Expand Down
17 changes: 16 additions & 1 deletion src/Processors/Formats/Impl/ParquetV3BlockInputFormat.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,23 @@ Chunk ParquetV3BlockInputFormat::read()
temp_prefetcher.init(in, read_options, parser_shared_resources);
parquet::format::FileMetaData file_metadata = getFileMetadata(temp_prefetcher);

size_t num_rows = 0;
if (buckets_to_read)
{
/// Only count rows in the assigned row groups. Otherwise multiple sources
/// reading buckets of the same file would each report the file's total.
for (size_t rg : buckets_to_read->row_group_ids)
{
if (rg < file_metadata.row_groups.size())
num_rows += size_t(file_metadata.row_groups[rg].num_rows);
}
}
else
{
num_rows = size_t(file_metadata.num_rows);
}

auto chunk = getChunkForCount(size_t(file_metadata.num_rows));
auto chunk = getChunkForCount(num_rows);
chunk.getChunkInfos().add(std::make_shared<ChunkInfoRowNumbers>(0));

reported_count = true;
Expand Down
80 changes: 75 additions & 5 deletions src/Storages/StorageFile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1546,6 +1546,15 @@ Chunk StorageFileSource::generate()
progress_callback(FileProgress(0, tryGetFileSizeFromReadBuffer(*read_buf).value_or(0)));
}
}
else if (fixed_file_path.has_value())
{
/// This source was assigned to one specific (file, bucket) pair.
/// Consume it exactly once.
if (fixed_file_consumed)
return {};
fixed_file_consumed = true;
current_path = *fixed_file_path;
}
else
{
current_path = files_iterator->next();
Expand All @@ -1572,7 +1581,11 @@ Chunk StorageFileSource::generate()
if (getContext()->getSettingsRef()[Setting::engine_file_skip_empty_files] && file_stat.st_size == 0)
continue;

if (need_only_count && tryGetCountFromCache(file_stat))
/// The count cache stores the file's total row count. When this source
/// only reads a subset of the file (file_bucket_info is set), the cache
/// is inapplicable — using it would have every source report the full
/// total and produce a count that's multiplied by the number of buckets.
if (need_only_count && !file_bucket_info && tryGetCountFromCache(file_stat))
continue;

read_buf = createReadBuffer(current_path, file_stat, storage->use_table_fd, storage->table_fd, storage->compression_method, getContext());
Expand Down Expand Up @@ -1601,6 +1614,12 @@ Chunk StorageFileSource::generate()

input_format->setSerializationHints(serialization_hints);

/// If this source was assigned to read only a subset of the file's buckets
/// (used to read one large file with multiple parallel sources), pass the
/// bucket assignment to the format before it starts reading.
if (file_bucket_info)
input_format->setBucketsToRead(file_bucket_info);

if (need_only_count)
input_format->needOnlyCount();

Expand Down Expand Up @@ -1666,7 +1685,8 @@ Chunk StorageFileSource::generate()
finished_generate = true;

if (input_format && storage->format_name != "Distributed" && getContext()->getSettingsRef()[Setting::use_cache_for_count_from_files]
&& (!format_filter_info || !format_filter_info->hasFilter()))
&& (!format_filter_info || !format_filter_info->hasFilter())
&& !file_bucket_info)
addNumRowsToCache(current_path, total_rows_in_file);

total_rows_in_file = 0;
Expand Down Expand Up @@ -1869,11 +1889,55 @@ void ReadFromFile::initializePipeline(QueryPipelineBuilder & pipeline, const Bui
if (max_num_streams > files_to_read)
num_streams = files_to_read;

auto ctx = getContext();

/// If we are reading exactly one local file in a splittable format (e.g. Parquet),
/// we can split it into multiple buckets (row group ranges) and create one source
/// per bucket. This recovers the parallelism we'd otherwise have only when reading
/// many files at once. Without this, a single big Parquet file feeds the whole
/// downstream pipeline through a single source/Resize(1->N) — leaving most of the
/// CPU idle on machines with many cores.
///
/// We use the file list from `files_iterator` rather than `storage->paths`: the
/// iterator has already pruned files by `_path`/`_file` virtual-column predicates
/// (`createPathAndFileFilterDAG`), so the optimization respects that pruning. If
/// the predicate excludes the only path the file is not read at all. It also
/// means a query against many paths whose predicate prunes down to a single file
/// still benefits from the split.
std::vector<FileBucketInfoPtr> per_source_buckets;
String single_file_path;
if (max_num_streams > 1
&& !storage->archive_info
&& !storage->use_table_fd
&& !storage->has_peekable_read_buffer_from_fd.load()
&& !storage->distributed_processing
&& storage->compression_method == "auto"
&& FormatFactory::instance().checkFormatHasSplitter(storage->format_name)
&& FormatFactory::instance().checkParallelizeOutputAfterReading(storage->format_name, ctx)
&& files_iterator->getFiles().size() == 1)
{
auto splitter = FormatFactory::instance().getSplitter(storage->format_name);
single_file_path = files_iterator->getFiles().front();
struct stat file_stat = getFileStat(single_file_path, false, -1, storage->getName());
if (file_stat.st_size > 0)
{
auto buf = createReadBuffer(
single_file_path, file_stat, false, -1, storage->compression_method, ctx);
auto buckets = splitter->splitToBucketsByCount(
max_num_streams, *buf,
storage->format_settings.value_or(getFormatSettings(ctx)));

if (buckets.size() >= 2)
{
per_source_buckets = std::move(buckets);
num_streams = per_source_buckets.size();
}
}
}

Pipes pipes;
pipes.reserve(num_streams);

auto ctx = getContext();

/// Set total number of bytes to process. For progress bar.
auto progress_callback = ctx->getFileProgressCallback();

Expand Down Expand Up @@ -1904,13 +1968,19 @@ void ReadFromFile::initializePipeline(QueryPipelineBuilder & pipeline, const Bui
parser_shared_resources,
format_filter_info);

if (i < per_source_buckets.size())
{
source->fixed_file_path = single_file_path;
source->file_bucket_info = per_source_buckets[i];
}

pipes.emplace_back(std::move(source));
}

auto pipe = Pipe::unitePipes(std::move(pipes));
size_t output_ports = pipe.numOutputPorts();
const bool parallelize_output = ctx->getSettingsRef()[Setting::parallelize_output_from_storages];
if (parallelize_output && storage->parallelizeOutputAfterReading(ctx) && output_ports > 0 && output_ports < max_num_streams)
if (parallelize_output && storage->parallelizeOutputAfterReading(ctx) && output_ports > 0 && output_ports != max_num_streams)
pipe.resize(max_num_streams);

if (pipe.empty())
Expand Down
19 changes: 19 additions & 0 deletions src/Storages/StorageFile.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ using OutputFormatPtr = std::shared_ptr<IOutputFormat>;
class IInputFormat;
using InputFormatPtr = std::shared_ptr<IInputFormat>;

struct FileBucketInfo;
using FileBucketInfoPtr = std::shared_ptr<FileBucketInfo>;

class PullingPipelineExecutor;

class StorageFile final : public IStorage
Expand Down Expand Up @@ -248,6 +251,11 @@ class StorageFileSource : public ISource, WithContext
}

const String & getFileNameInArchive();

/// Returns the (possibly virtual-column-filtered) list of files this iterator
/// will produce. Only meaningful when not reading from an archive and not
/// using distributed_processing.
const std::vector<std::string> & getFiles() const { return files; }
private:
std::vector<std::string> files;

Expand Down Expand Up @@ -312,6 +320,17 @@ class StorageFileSource : public ISource, WithContext
std::shared_ptr<IArchiveReader> archive_reader;
std::unique_ptr<IArchiveReader::FileEnumerator> file_enumerator;

/// Optional subset-of-file assignment. When set, the input format only reads
/// these buckets (e.g. for Parquet — only the listed row groups). This is how
/// a single big file is processed in parallel by multiple sources.
FileBucketInfoPtr file_bucket_info;

/// When this source has been assigned a specific (file, bucket) pair, it
/// reads only that one file (once) and ignores the shared FilesIterator.
/// Set together with `file_bucket_info`.
std::optional<String> fixed_file_path;
bool fixed_file_consumed = false;

ColumnsDescription columns_description;
NamesAndTypesList requested_columns;
NamesAndTypesList requested_virtual_columns;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,4 @@ ExpressionTransform
(Expression)
ExpressionTransform × 2
(ReadFromFile)
Resize 1 → 2
File 0 → 1
File × 2 0 → 1
Loading