Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions docs/en/engines/table-engines/mergetree-family/part_export.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,24 @@ Source and destination tables must be 100% compatible:
- **Default**: `0`
- **Description**: Maximum number of rows to write to a single file when exporting a merge tree part. 0 means no limit. This is not a hard limit, and it highly depends on the output format granularity and input source chunk size. Using this might break idempotency, use it with care.

### export_merge_tree_part_throw_on_pending_mutations

- **Type**: `bool`
- **Default**: `true`
- **Description**: If set to true, throws if pending mutations exists for a given part. Note that by default mutations are applied to all parts, which means that if a mutation in practice would only affetct part/partition x, all the other parts/partition will throw upon export. The exception is when the `IN PARTITION` clause was used in the mutation command. Note the `IN PARTITION` clause is not properly implemented for plain MergeTree tables.

### export_merge_tree_part_throw_on_pending_patch_parts

- **Type**: `bool`
- **Default**: `true`
- **Description**: If set to true, throws if pending patch parts exists for a given part. Note that by default mutations are applied to all parts, which means that if a mutation in practice would only affetct part/partition x, all the other parts/partition will throw upon export. The exception is when the `IN PARTITION` clause was used in the mutation command. Note the `IN PARTITION` clause is not properly implemented for plain MergeTree tables.

### export_merge_tree_part_allow_outdated_parts

- **Type**: `bool`
- **Default**: `false`
- **Description**: Allows outdated parts to be exported. By default, only `ACTIVE` parts can be exported.

## Examples

### Basic Export to S3
Expand Down
12 changes: 12 additions & 0 deletions docs/en/engines/table-engines/mergetree-family/partition_export.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,18 @@ TO TABLE [destination_database.]destination_table
- `error` - Throw an error if the file already exists
- `overwrite` - Overwrite the file

### export_merge_tree_part_throw_on_pending_mutations

- **Type**: `bool`
- **Default**: `true`
- **Description**: If set to true, throws if pending mutations exists for a given part. Note that by default mutations are applied to all parts, which means that if a mutation in practice would only affetct part/partition x, all the other parts/partition will throw upon export. The exception is when the `IN PARTITION` clause was used in the mutation command. Note the `IN PARTITION` clause is not properly implemented for plain MergeTree tables.

### export_merge_tree_part_throw_on_pending_patch_parts

- **Type**: `bool`
- **Default**: `true`
- **Description**: If set to true, throws if pending patch parts exists for a given part. Note that by default mutations are applied to all parts, which means that if a mutation in practice would only affetct part/partition x, all the other parts/partition will throw upon export. The exception is when the `IN PARTITION` clause was used in the mutation command. Note the `IN PARTITION` clause is not properly implemented for plain MergeTree tables.

## Examples

### Basic Export to S3
Expand Down
3 changes: 2 additions & 1 deletion src/Common/ErrorCodes.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -648,6 +648,7 @@
M(1002, UNKNOWN_EXCEPTION) \
M(1003, SSH_EXCEPTION) \
M(1004, STARTUP_SCRIPTS_ERROR) \
M(1005, PENDING_MUTATIONS_NOT_ALLOWED) \
/* See END */

#ifdef APPLY_FOR_EXTERNAL_ERROR_CODES
Expand All @@ -664,7 +665,7 @@ namespace ErrorCodes
APPLY_FOR_ERROR_CODES(M)
#undef M

constexpr ErrorCode END = 1004;
constexpr ErrorCode END = 1005;
ErrorPairHolder values[END + 1]{};

struct ErrorCodesNames
Expand Down
6 changes: 6 additions & 0 deletions src/Core/Settings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6920,6 +6920,12 @@ This is not a hard limit, and it highly depends on the output format granularity
DECLARE(UInt64, export_merge_tree_part_max_rows_per_file, 0, R"(
Maximum number of rows to write to a single file when exporting a merge tree part. 0 means no limit.
This is not a hard limit, and it highly depends on the output format granularity and input source chunk size.
)", 0) \
DECLARE(Bool, export_merge_tree_part_throw_on_pending_mutations, true, R"(
Throw an error if there are pending mutations when exporting a merge tree part.
)", 0) \
DECLARE(Bool, export_merge_tree_part_throw_on_pending_patch_parts, true, R"(
Throw an error if there are pending patch parts when exporting a merge tree part.
)", 0) \
DECLARE(Bool, serialize_string_in_memory_with_zero_byte, true, R"(
Serialize String values during aggregation with zero byte at the end. Enable to keep compatibility when querying cluster of incompatible versions.
Expand Down
2 changes: 2 additions & 0 deletions src/Core/SettingsChangesHistory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory()
{"allow_experimental_hybrid_table", false, false, "Added new setting to allow the Hybrid table engine."},
{"cluster_table_function_split_granularity", "file", "file", "New setting."},
{"cluster_table_function_buckets_batch_size", 0, 0, "New setting."},
{"export_merge_tree_part_throw_on_pending_mutations", true, true, "New setting."},
{"export_merge_tree_part_throw_on_pending_patch_parts", true, true, "New setting."},
});
addSettingsChanges(settings_changes_history, "25.8",
{
Expand Down
6 changes: 6 additions & 0 deletions src/Storages/MergeTree/ExportPartitionTaskScheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@ namespace
context_copy->setSetting("export_merge_tree_part_file_already_exists_policy", String(magic_enum::enum_name(manifest.file_already_exists_policy)));
context_copy->setSetting("export_merge_tree_part_max_bytes_per_file", manifest.max_bytes_per_file);
context_copy->setSetting("export_merge_tree_part_max_rows_per_file", manifest.max_rows_per_file);

/// always skip pending mutations and patch parts because we already validated the parts during query processing
context_copy->setSetting("export_merge_tree_part_throw_on_pending_mutations", false);
context_copy->setSetting("export_merge_tree_part_throw_on_pending_patch_parts", false);

return context_copy;
}
}
Expand Down Expand Up @@ -144,6 +149,7 @@ void ExportPartitionTaskScheduler::run()
destination_storage_id,
manifest.transaction_id,
getContextCopyWithTaskSettings(storage.getContext(), manifest),
/*allow_outdated_parts*/ true,
[this, key, zk_part_name, manifest, destination_storage]
(MergeTreePartExportManifest::CompletionCallbackResult result)
{
Expand Down
41 changes: 41 additions & 0 deletions src/Storages/MergeTree/MergeTreeData.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,8 @@ namespace Setting
extern const SettingsMergeTreePartExportFileAlreadyExistsPolicy export_merge_tree_part_file_already_exists_policy;
extern const SettingsBool output_format_parallel_formatting;
extern const SettingsBool output_format_parquet_parallel_encoding;
extern const SettingsBool export_merge_tree_part_throw_on_pending_mutations;
extern const SettingsBool export_merge_tree_part_throw_on_pending_patch_parts;
}

namespace MergeTreeSetting
Expand Down Expand Up @@ -336,6 +338,7 @@ namespace ErrorCodes
extern const int TOO_LARGE_LIGHTWEIGHT_UPDATES;
extern const int UNKNOWN_TABLE;
extern const int FILE_ALREADY_EXISTS;
extern const int PENDING_MUTATIONS_NOT_ALLOWED;
}

static void checkSuspiciousIndices(const ASTFunction * index_function)
Expand Down Expand Up @@ -6222,6 +6225,7 @@ void MergeTreeData::exportPartToTable(
const StorageID & destination_storage_id,
const String & transaction_id,
ContextPtr query_context,
bool allow_outdated_parts,
std::function<void(MergeTreePartExportManifest::CompletionCallbackResult)> completion_callback)
{
auto dest_storage = DatabaseCatalog::instance().getTable(destination_storage_id, query_context);
Expand Down Expand Up @@ -6254,6 +6258,43 @@ void MergeTreeData::exportPartToTable(
throw Exception(ErrorCodes::NO_SUCH_DATA_PART, "No such data part '{}' to export in table '{}'",
part_name, getStorageID().getFullTableName());

if (part->getState() == MergeTreeDataPartState::Outdated && !allow_outdated_parts)
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"Part {} is in the outdated state and cannot be exported",
part_name);

const bool throw_on_pending_mutations = query_context->getSettingsRef()[Setting::export_merge_tree_part_throw_on_pending_mutations];
const bool throw_on_pending_patch_parts = query_context->getSettingsRef()[Setting::export_merge_tree_part_throw_on_pending_patch_parts];

MergeTreeData::IMutationsSnapshot::Params mutations_snapshot_params
{
.metadata_version = source_metadata_ptr->getMetadataVersion(),
.min_part_metadata_version = part->getMetadataVersion(),
.need_data_mutations = throw_on_pending_mutations,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does ClickHouse throw some exception when this flag is set? made only fast check, but looks like only some parts are skipped.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does ClickHouse throw some exception when this flag is set

It should not, at least in this context. The code you are reviewing is the code that process the export query request, not the code that actually exports the data. The mutations_snapshot_params is used to create mutations_snapshot and alter_conversions. None is used for anything but to check if there are pending mutations within this function.

but looks like only some parts are skipped.

I did not understand this, can you elaborate?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unclean. why flags "need_something" are filled with values from "throw_on_some_condition".
When I see "throw_on_some_condition", I expect that setting causes exception, but here it changes other behavior.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TLDR: To throw, I need to know if it exists. To know if it exists, I need to set it to true.

Ah, I see. The need_something flags dictacte if the mutation snapshot contains the mutations / patch parts and etc. If I hard code it to false, the snapshot will never include information about pending mutations / patch parts. In this case, I will not be able to check if they exist and will not be able to throw.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see that it throw exception. I mean, setting does something more than only throw exception.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What else does it do in this case? I don't think so.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I say about naming, not about code logic :)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Setting export_merge_tree_part_throw_on_pending_mutations change value of need_data_mutations field in mutations_snapshot_params. need_data_mutations changes behavior. So setting name is confusing, something like "export_merge_tree_part_do_not_export_on_pending_mutations" (I'm not good in naming) is more clean.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need_data_mutations changes behavior.

Not in this case. Like I said here, in this code snippet the only effect of setting need_data_mutations is that the snapshot and alter conversions object will contain information about mutations.

And if it does, we throw. Because the only reason for that object to have mutations is: 1. there are pending mutations; 2. the user set throw = true

.need_alter_mutations = throw_on_pending_mutations || throw_on_pending_patch_parts,
.need_patch_parts = throw_on_pending_patch_parts,
};

const auto mutations_snapshot = getMutationsSnapshot(mutations_snapshot_params);

const auto alter_conversions = getAlterConversionsForPart(part, mutations_snapshot, query_context);

/// re-check `throw_on_pending_mutations` because `pending_mutations` might have been filled due to `throw_on_pending_patch_parts`
if (throw_on_pending_mutations && alter_conversions->hasMutations())
{
throw Exception(ErrorCodes::PENDING_MUTATIONS_NOT_ALLOWED,
"Part {} can not be exported because there are pending mutations. Either wait for the mutations to be applied or set `export_merge_tree_part_throw_on_pending_mutations` to false",
part_name);
}

if (alter_conversions->hasPatches())

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to check export_merge_tree_part_throw_on_pending_patch_parts here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No because of #1294 (comment)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can add the check if you think it is more clear, but it is not needed at all

{
throw Exception(ErrorCodes::PENDING_MUTATIONS_NOT_ALLOWED,
"Part {} can not be exported because there are pending patch parts. Either wait for the patch parts to be applied or set `export_merge_tree_part_throw_on_pending_patch_parts` to false",
part_name);
}

{
const auto format_settings = getFormatSettings(query_context);
MergeTreePartExportManifest manifest(
Expand Down
1 change: 1 addition & 0 deletions src/Storages/MergeTree/MergeTreeData.h
Original file line number Diff line number Diff line change
Expand Up @@ -989,6 +989,7 @@ class MergeTreeData : public IStorage, public WithMutableContext
const StorageID & destination_storage_id,
const String & transaction_id,
ContextPtr query_context,
bool allow_outdated_parts = false,
std::function<void(MergeTreePartExportManifest::CompletionCallbackResult)> completion_callback = {});

void killExportPart(const String & transaction_id);
Expand Down
43 changes: 41 additions & 2 deletions src/Storages/StorageReplicatedMergeTree.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,8 @@ namespace Setting
extern const SettingsMergeTreePartExportFileAlreadyExistsPolicy export_merge_tree_part_file_already_exists_policy;
extern const SettingsUInt64 export_merge_tree_part_max_bytes_per_file;
extern const SettingsUInt64 export_merge_tree_part_max_rows_per_file;
extern const SettingsBool export_merge_tree_part_throw_on_pending_mutations;
extern const SettingsBool export_merge_tree_part_throw_on_pending_patch_parts;
}

namespace MergeTreeSetting
Expand Down Expand Up @@ -305,6 +307,7 @@ namespace ErrorCodes
extern const int CANNOT_FORGET_PARTITION;
extern const int TIMEOUT_EXCEEDED;
extern const int INVALID_SETTING_VALUE;
extern const int PENDING_MUTATIONS_NOT_ALLOWED;
}

namespace ServerSetting
Expand Down Expand Up @@ -8192,18 +8195,54 @@ void StorageReplicatedMergeTree::exportPartitionToTable(const PartitionCommand &

ops.emplace_back(zkutil::makeCreateRequest(partition_exports_path, "", zkutil::CreateMode::Persistent));

auto data_parts_lock = lockParts();
DataPartsVector parts;

const auto parts = getDataPartsVectorInPartitionForInternalUsage(MergeTreeDataPartState::Active, partition_id, &data_parts_lock);
{
auto data_parts_lock = lockParts();
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Either getMutationsSnapshot or getAlterConversionsForPart grabs a lock. Putting this in a scope to avoid dead locks

parts = getDataPartsVectorInPartitionForInternalUsage(MergeTreeDataPartState::Active, partition_id, &data_parts_lock);
}

if (parts.empty())
{
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Partition {} doesn't exist", partition_id);
}

const bool throw_on_pending_mutations = query_context->getSettingsRef()[Setting::export_merge_tree_part_throw_on_pending_mutations];
const bool throw_on_pending_patch_parts = query_context->getSettingsRef()[Setting::export_merge_tree_part_throw_on_pending_patch_parts];

MergeTreeData::IMutationsSnapshot::Params mutations_snapshot_params
{
.metadata_version = getInMemoryMetadataPtr()->getMetadataVersion(),
.min_part_metadata_version = MergeTreeData::getMinMetadataVersion(parts),
.need_data_mutations = throw_on_pending_mutations,
.need_alter_mutations = throw_on_pending_mutations || throw_on_pending_patch_parts,
.need_patch_parts = throw_on_pending_patch_parts,
};

const auto mutations_snapshot = getMutationsSnapshot(mutations_snapshot_params);

std::vector<String> part_names;
for (const auto & part : parts)
{
const auto alter_conversions = getAlterConversionsForPart(part, mutations_snapshot, query_context);

/// re-check `throw_on_pending_mutations` because `pending_mutations` might have been filled due to `throw_on_pending_patch_parts`
if (alter_conversions->hasMutations() && throw_on_pending_mutations)
{
throw Exception(ErrorCodes::PENDING_MUTATIONS_NOT_ALLOWED,
"Partition {} can not be exported because the part {} has pending mutations. Either wait for the mutations to be applied or set `export_merge_tree_part_throw_on_pending_mutations` to false",
partition_id,
part->name);
}

if (alter_conversions->hasPatches())
{
throw Exception(ErrorCodes::PENDING_MUTATIONS_NOT_ALLOWED,
"Partition {} can not be exported because the part {} has pending patch parts. Either wait for the patch parts to be applied or set `export_merge_tree_part_throw_on_pending_patch_parts` to false",
partition_id,
part->name);
}

part_names.push_back(part->name);
}

Expand Down
Loading
Loading