Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions src/Databases/DataLake/DatabaseDataLake.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -672,6 +672,9 @@ StoragePtr DatabaseDataLake::tryGetTableImpl(const String & name, ContextPtr con

const auto is_secondary_query = context_->getClientInfo().query_kind == ClientInfo::QueryKind::SECONDARY_QUERY;

const auto catalog_uuid = table_metadata.getTableUUID();
const UUID table_uuid = catalog_uuid ? parseFromString<UUID>(*catalog_uuid) : UUIDHelpers::Nil;

std::string cluster_name = configuration->isClusterSupported() ? settings[DatabaseDataLakeSetting::object_storage_cluster].value : "";

if (cluster_name.empty() && can_use_parallel_replicas && !is_secondary_query)
Expand All @@ -680,8 +683,8 @@ StoragePtr DatabaseDataLake::tryGetTableImpl(const String & name, ContextPtr con
auto storage_cluster = std::make_shared<StorageObjectStorageCluster>(
cluster_name,
configuration,
configuration->createObjectStorage(context_copy, /* is_readonly */ false, catalog->getCredentialsConfigurationCallback(StorageID(getDatabaseName(), name))),
StorageID(getDatabaseName(), name),
configuration->createObjectStorage(context_copy, /* is_readonly */ false, catalog->getCredentialsConfigurationCallback(StorageID(getDatabaseName(), name, table_uuid))),
StorageID(getDatabaseName(), name, table_uuid),
/* columns */columns,
/* constraints */ConstraintsDescription{},
/* partition_by */nullptr,
Expand Down
4 changes: 4 additions & 0 deletions src/Databases/DataLake/ICatalog.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@ class TableMetadata
void setDataLakeSpecificProperties(std::optional<DataLakeSpecificProperties> && metadata);
std::optional<DataLakeSpecificProperties> getDataLakeSpecificProperties() const;

void setTableUUID(const std::string & uuid_) { table_uuid = uuid_; }
std::optional<std::string> getTableUUID() const { return table_uuid; }

bool requiresLocation() const { return with_location; }
bool requiresSchema() const { return with_schema; }
bool requiresCredentials() const { return with_storage_credentials; }
Expand Down Expand Up @@ -126,6 +129,7 @@ class TableMetadata
std::optional<DataLakeSpecificProperties> data_lake_specific_metadata;

std::string reason_why_table_is_not_readable;
std::optional<std::string> table_uuid;

bool is_default_readable_table = true;

Expand Down
3 changes: 3 additions & 0 deletions src/Databases/DataLake/RestCatalog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -968,6 +968,9 @@ bool RestCatalog::getTableMetadataImpl(
}
}

if (metadata_object->has("table-uuid"))
result.setTableUUID(metadata_object->get("table-uuid").extract<String>());

return true;
}

Expand Down
3 changes: 3 additions & 0 deletions src/Databases/DataLake/UnityCatalog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,9 @@ bool UnityCatalog::tryGetTableMetadata(
LOG_DEBUG(log, "Doesn't require schema");
}

if (hasValueAndItsNotNone("table_id", object))
result.setTableUUID(object->get("table_id").extract<String>());

if (result.isDefaultReadableTable() && result.requiresCredentials())
getCredentials(object->get("table_id"), result);

Expand Down
15 changes: 15 additions & 0 deletions src/Formats/FormatFilterInfo.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include <Formats/FormatFilterInfo.h>
#include <Core/Settings.h>
#include <Storages/MergeTree/KeyCondition.h>
#include <Storages/VirtualColumnUtils.h>
#include <Interpreters/ExpressionActions.h>

#include <DataTypes/DataTypeTuple.h>
Expand All @@ -9,6 +10,8 @@
#include <Columns/IColumn.h>
#include <Core/TypeId.h>

#include <Interpreters/Context.h>

namespace DB
{

Expand All @@ -18,6 +21,11 @@ namespace ErrorCodes
extern const int ICEBERG_SPECIFICATION_VIOLATION;
}

namespace Setting
{
extern const SettingsBool use_query_condition_cache;
}

void ColumnMapper::setStorageColumnEncoding(std::unordered_map<String, Int64> && storage_encoding_)
{
chassert(storage_encoding.empty());
Expand Down Expand Up @@ -60,6 +68,13 @@ FormatFilterInfo::FormatFilterInfo(
, prewhere_info(std::move(prewhere_info_))
, column_mapper(column_mapper_)
{
bool use_query_condition_cache = context_->getSettingsRef()[Setting::use_query_condition_cache];
if (use_query_condition_cache && filter_actions_dag)
{
const auto & outputs = filter_actions_dag->getOutputs();
if (outputs.size() == 1 && VirtualColumnUtils::isDeterministic(outputs[0]))
condition_hash = filter_actions_dag->getHash();
}
}

FormatFilterInfo::FormatFilterInfo() = default;
Expand Down
1 change: 1 addition & 0 deletions src/Formats/FormatFilterInfo.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ struct FormatFilterInfo

ColumnMapperPtr column_mapper;

std::optional<size_t> condition_hash;
private:
/// For lazily initializing the fields above.
std::once_flag init_flag;
Expand Down
1 change: 1 addition & 0 deletions src/Processors/Formats/IInputFormat.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
namespace DB
{


ChunkInfoRowNumbers::ChunkInfoRowNumbers(size_t row_num_offset_, std::optional<IColumnFilter> applied_filter_)
: row_num_offset(row_num_offset_), applied_filter(std::move(applied_filter_)) { }

Expand Down
3 changes: 3 additions & 0 deletions src/Processors/Formats/IInputFormat.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ struct FileBucketInfo
virtual void deserialize(ReadBuffer & buffer) = 0;
virtual String getIdentifier() const = 0;
virtual String getFormatName() const = 0;
virtual std::shared_ptr<FileBucketInfo> filterByMatchingRowGroups(const std::vector<size_t> & matching_row_groups) const = 0;

virtual ~FileBucketInfo() = default;
};
Expand Down Expand Up @@ -128,6 +129,8 @@ class IInputFormat : public ISource

void needOnlyCount() { need_only_count = true; }

virtual std::optional<std::pair<std::vector<size_t>, size_t>> getMatchedBuckets() const { return std::nullopt; }

protected:
ReadBuffer & getReadBuffer() const { chassert(in); return *in; }

Expand Down
16 changes: 16 additions & 0 deletions src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -638,6 +638,22 @@ ParquetFileBucketInfo::ParquetFileBucketInfo(const std::vector<size_t> & row_gro
{
}

std::shared_ptr<FileBucketInfo> ParquetFileBucketInfo::filterByMatchingRowGroups(const std::vector<size_t> & matching_row_groups) const
{
if (matching_row_groups.empty())
return nullptr;
if (row_group_ids.empty())
return std::make_shared<ParquetFileBucketInfo>(matching_row_groups);
std::unordered_set<size_t> matching_set(matching_row_groups.begin(), matching_row_groups.end());
std::vector<size_t> filtered;
for (size_t rg : row_group_ids)
if (matching_set.contains(rg))
filtered.push_back(rg);
if (filtered.empty())
return nullptr;
return std::make_shared<ParquetFileBucketInfo>(std::move(filtered));
}

void registerParquetFileBucketInfo(std::unordered_map<String, FileBucketInfoPtr> & instances)
{
instances.emplace("Parquet", std::make_shared<ParquetFileBucketInfo>());
Expand Down
1 change: 1 addition & 0 deletions src/Processors/Formats/Impl/ParquetBlockInputFormat.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ struct ParquetFileBucketInfo : public FileBucketInfo
{
return "Parquet";
}
std::shared_ptr<FileBucketInfo> filterByMatchingRowGroups(const std::vector<size_t> & matching_row_groups) const override;
};
using ParquetFileBucketInfoPtr = std::shared_ptr<ParquetFileBucketInfo>;

Expand Down
26 changes: 26 additions & 0 deletions src/Processors/Formats/Impl/ParquetV3BlockInputFormat.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,32 @@ Chunk ParquetV3BlockInputFormat::read()
return std::move(res.chunk);
}

std::optional<std::pair<std::vector<size_t>, size_t>> ParquetV3BlockInputFormat::getMatchedBuckets() const
{
if (!reader)
return std::nullopt;
std::vector<size_t> matched;
for (const auto & row_group : reader->reader.row_groups)
{
if (!row_group.need_to_process)
continue;

bool produced_rows = false;
for (const auto & subgroup : row_group.subgroups)
{
if (subgroup.filter.rows_pass > 0)
{
produced_rows = true;
break;
}
}

if (produced_rows)
matched.push_back(row_group.row_group_idx);
}
return std::make_pair(std::move(matched), reader->reader.file_metadata.row_groups.size());
}

void ParquetV3BlockInputFormat::setBucketsToRead(const FileBucketInfoPtr & buckets_to_read_)
{
if (reader)
Expand Down
2 changes: 2 additions & 0 deletions src/Processors/Formats/Impl/ParquetV3BlockInputFormat.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ class ParquetV3BlockInputFormat : public IInputFormat

void setBucketsToRead(const FileBucketInfoPtr & buckets_to_read_) override;

std::optional<std::pair<std::vector<size_t>, size_t>> getMatchedBuckets() const override;

private:
Chunk read() override;

Expand Down
3 changes: 3 additions & 0 deletions src/Processors/QueryPlan/ReadFromObjectStorageStep.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ namespace Setting


ReadFromObjectStorageStep::ReadFromObjectStorageStep(
const StorageID & storage_id_,
ObjectStoragePtr object_storage_,
StorageObjectStorageConfigurationPtr configuration_,
const Names & columns_to_read,
Expand All @@ -42,6 +43,7 @@ ReadFromObjectStorageStep::ReadFromObjectStorageStep(
size_t max_block_size_,
size_t num_streams_)
: SourceStepWithFilter(std::make_shared<const Block>(info_.source_header), columns_to_read, query_info_, storage_snapshot_, context_)
, storage_id(storage_id_)
, object_storage(object_storage_)
, configuration(configuration_)
, info(std::move(info_))
Expand Down Expand Up @@ -110,6 +112,7 @@ void ReadFromObjectStorageStep::initializePipeline(QueryPipelineBuilder & pipeli
for (size_t i = 0; i < num_streams; ++i)
{
auto source = std::make_shared<StorageObjectStorageSource>(
storage_id,
getName(),
object_storage,
configuration,
Expand Down
2 changes: 2 additions & 0 deletions src/Processors/QueryPlan/ReadFromObjectStorageStep.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ class ReadFromObjectStorageStep : public SourceStepWithFilter
{
public:
ReadFromObjectStorageStep(
const StorageID & storage_id_,
ObjectStoragePtr object_storage_,
StorageObjectStorageConfigurationPtr configuration_,
const Names & columns_to_read,
Expand Down Expand Up @@ -46,6 +47,7 @@ class ReadFromObjectStorageStep : public SourceStepWithFilter
InputOrderInfoPtr getDataOrder() const;

private:
StorageID storage_id;
ObjectStoragePtr object_storage;
StorageObjectStorageConfigurationPtr configuration;
std::shared_ptr<IObjectIterator> iterator_wrapper;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -347,20 +347,19 @@ ProcessedManifestFileEntryPtr ManifestFileIterator::processRow(size_t row_index)
const auto schema_id_opt = schema_processor_ptr->tryGetSchemaIdForSnapshot(resolved_snapshot_id);
if (!schema_id_opt.has_value())
{
/// Error logged but not thrown to avoid breaking whole query because of backward compatibility reasons.
/// That's actually an error because it can lead to incorrect query results, so we are creating an exception to put it to system.error_log.
try
{
throw Exception(
ErrorCodes::ICEBERG_SPECIFICATION_VIOLATION,
"Cannot read Iceberg table: manifest file '{}' has entry with snapshot_id '{}' for which write file schema is unknown",
manifest_file_name,
resolved_snapshot_id);
}
catch (const Exception &)
{
tryLogCurrentException("ICEBERG_SPECIFICATION_VIOLATION", "", LogsLevel::error);
}
/// This is expected when the referenced snapshot was expired by the catalog (snapshot expiry is a
/// normal Iceberg housekeeping operation). For example, after a compaction ("replace" operation),
/// the new snapshot's manifest list inherits manifests from the now-expired parent snapshot, and
/// those manifests still carry the original snapshot_id. The manifest file's own Avro header
/// records the correct schema_id for the data files it describes, so falling back to
/// manifest_schema_id is safe and correct in this case.
LOG_DEBUG(
getLogger("ManifestFileIterator"),
"Manifest file '{}' has entry with snapshot_id '{}' whose snapshot metadata is not present "
"(snapshot may have been expired by the catalog). Falling back to manifest schema_id {}.",
path_to_manifest_file,
resolved_snapshot_id,
manifest_schema_id);
}
const auto resolved_schema_id = schema_id_opt.has_value() ? *schema_id_opt : manifest_schema_id;

Expand Down
46 changes: 42 additions & 4 deletions src/Storages/ObjectStorage/IObjectIterator.cpp
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
#include <Storages/VirtualColumnUtils.h>
#include <Storages/ObjectStorage/IObjectIterator.h>
#include <Interpreters/ExpressionActions.h>
#include <Interpreters/Cache/QueryConditionCache.h>
#include <Formats/FormatFactory.h>
#include <Formats/FormatFilterInfo.h>
#include <Interpreters/Context.h>
#include <Disks/DiskObjectStorage/ObjectStorages/IObjectStorage.h>
#include <IO/ReadBufferFromFileBase.h>
Expand Down Expand Up @@ -107,13 +109,19 @@ ObjectIteratorSplitByBuckets::ObjectIteratorSplitByBuckets(
ObjectIterator iterator_,
const String & format_,
ObjectStoragePtr object_storage_,
const ContextPtr & context_)
const ContextPtr & context_,
const StorageID & storage_id_,
FormatFilterInfoPtr format_filter_info_)
: WithContext(context_)
, iterator(iterator_)
, format(format_)
, object_storage(object_storage_)
, format_settings(getFormatSettings(context_))
, storage_id(storage_id_)
, format_filter_info(std::move(format_filter_info_))
{
if (format_filter_info && format_filter_info->condition_hash)
query_condition_cache = context_->getQueryConditionCache();
}

ObjectInfoPtr ObjectIteratorSplitByBuckets::next(size_t id)
Expand All @@ -127,13 +135,43 @@ ObjectInfoPtr ObjectIteratorSplitByBuckets::next(size_t id)
auto splitter = FormatFactory::instance().getSplitter(format);
if (splitter)
{
std::vector<size_t> matching_row_groups;
bool has_cache_entry = false;
if (query_condition_cache)
{
auto matching_marks = query_condition_cache->read(
storage_id.uuid,
last_object_info->getFileName(),
*format_filter_info->condition_hash);
if (matching_marks.has_value())
{
has_cache_entry = true;
const auto & marks = *matching_marks;
for (size_t i = 0; i < marks.size(); ++i)
if (marks[i])
matching_row_groups.push_back(i);
if (matching_row_groups.empty())
continue;
}
}

auto buffer = createReadBuffer(last_object_info->relative_path_with_metadata, object_storage, getContext(), log);
size_t bucket_size = getContext()->getSettingsRef()[Setting::cluster_table_function_buckets_batch_size];
auto file_bucket_info = splitter->splitToBuckets(bucket_size, *buffer, format_settings);
for (const auto & file_bucket : file_bucket_info)
auto file_bucket_infos = splitter->splitToBuckets(bucket_size, *buffer, format_settings);
for (const auto & file_bucket : file_bucket_infos)
{
auto copy_object_info = *last_object_info;
copy_object_info.file_bucket_info = file_bucket;
if (has_cache_entry)
{
auto filtered = file_bucket->filterByMatchingRowGroups(matching_row_groups);
if (!filtered)
continue;
copy_object_info.file_bucket_info = std::move(filtered);
}
else
{
copy_object_info.file_bucket_info = file_bucket;
}
pending_objects_info.push(std::make_shared<ObjectInfo>(copy_object_info));
}
}
Expand Down
10 changes: 9 additions & 1 deletion src/Storages/ObjectStorage/IObjectIterator.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@
#include <Disks/DiskObjectStorage/ObjectStorages/IObjectStorage.h>
#include <Processors/ISimpleTransform.h>
#include <Storages/ObjectStorage/StorageObjectStorageConfiguration.h>
#include <Interpreters/Cache/QueryConditionCache.h>
#include <Interpreters/StorageID.h>
#include <Formats/FormatFilterInfo.h>
#include <Common/Logger.h>
#include <Common/Macros.h>
#include <Formats/FormatSettings.h>
Expand Down Expand Up @@ -109,7 +112,9 @@ class ObjectIteratorSplitByBuckets : public IObjectIterator, private WithContext
ObjectIterator iterator_,
const String & format_,
ObjectStoragePtr object_storage_,
const ContextPtr & context_);
const ContextPtr & context_,
const StorageID & storage_id_ = StorageID::createEmpty(),
FormatFilterInfoPtr format_filter_info_ = nullptr);

ObjectInfoPtr next(size_t) override;
size_t estimatedKeysCount() override { return iterator->estimatedKeysCount(); }
Expand All @@ -120,6 +125,9 @@ class ObjectIteratorSplitByBuckets : public IObjectIterator, private WithContext
String format;
ObjectStoragePtr object_storage;
FormatSettings format_settings;
StorageID storage_id;
FormatFilterInfoPtr format_filter_info;
QueryConditionCachePtr query_condition_cache;

std::queue<ObjectInfoPtr> pending_objects_info;
const LoggerPtr log = getLogger("GlobIterator");
Expand Down
1 change: 1 addition & 0 deletions src/Storages/ObjectStorage/StorageObjectStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -507,6 +507,7 @@ void StorageObjectStorage::read(
configuration->modifyFormatSettings(modified_format_settings.value(), *local_context);

auto read_step = std::make_unique<ReadFromObjectStorageStep>(
storage_id,
object_storage,
configuration,
column_names,
Expand Down
Loading
Loading