Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions src/Common/ThreadStatus.h
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,16 @@ class ThreadGroup
void attachQueryForLog(const String & query_, UInt64 normalized_hash = 0);
void attachInternalProfileEventsQueue(const InternalProfileEventsQueuePtr & profile_queue);

/// Override the cancellation predicate. All threads that subsequently attach to this
/// group via ThreadGroupSwitcher inherit the predicate in their local_data, making
/// isQueryCanceled() reflect task-level cancellation without a process-list entry.
/// Required for part and partition export cancellation during S3 outage.
void setCancelPredicate(QueryIsCanceledPredicate predicate)
{
std::lock_guard lock(mutex);
shared_data.query_is_canceled_predicate = std::move(predicate);
}

/// When new query starts, new thread group is created for it, current thread becomes master thread of the query
static ThreadGroupPtr createForQuery(ContextPtr query_context_, FatalErrorCallback fatal_error_callback_ = {});

Expand Down
26 changes: 24 additions & 2 deletions src/Storages/MergeTree/ExportPartTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,28 @@ bool ExportPartTask::executeStep()
manifest.query_id,
local_context);

/*
This is a hack to fix the issue where S3 is out, ClickHouse keeps retrying S3 requests deep
in the AWS SDK and never check for the `isCancelled()` flag. That prevents the task from being killed / cancelled. It also prevents the table from being dropped.

Merges and mutations don't suffer from this problem because they don't make requests to S3 :). Select statements
do make requests to S3, but the cancel predicate is properly setup for regular queries.

I think this is the first time we have a background operation that makes requests to S3, so we need to connect the dots.

The simples way is this one, and given the release timeline, I am opting for it.
*/
(*exports_list_entry)->thread_group->setCancelPredicate(
[weak_this = weak_from_this()]() -> bool
{
if (auto shared_this = weak_this.lock())
{
return shared_this->isCancelled();
}

return true;
});

Comment on lines +189 to +199
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you sure that lifetime of the task exceeds lifetime of the thread group at all times? Maybe capture a weak pointer instead of this?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So.. when writing this code, this came to mind. The thing is that ThreadGroup is a member of ExportListEntry, which is tied to the lifetime of this task. So I assumed it would be valid. At the same time, ThreadGroupPtr is a shared_ptr, and it gets passed down to the pipeline and ThreadGroupSwitcher, so I am not actually sure about it... Too much wizardry

Maybe a weak_pointer would indeed be safer

SinkToStoragePtr sink;

const auto new_file_path_callback = [&exports_list_entry](const std::string & file_path)
Expand All @@ -185,6 +207,8 @@ bool ExportPartTask::executeStep()

try
{
ThreadGroupSwitcher switcher((*exports_list_entry)->thread_group, ThreadName::EXPORT_PART);

const auto filename = buildDestinationFilename(manifest, storage.getStorageID(), local_context);

sink = destination_storage->import(
Expand Down Expand Up @@ -232,8 +256,6 @@ bool ExportPartTask::executeStep()
local_context,
getLogger("ExportPartition"));

ThreadGroupSwitcher switcher((*exports_list_entry)->thread_group, ThreadName::EXPORT_PART);

/// We need to support exporting materialized and alias columns to object storage. For some reason, object storage engines don't support them.
/// This is a hack that materializes the columns before the export so they can be exported to tables that have matching columns
materializeSpecialColumns(plan_for_part.getCurrentHeader(), metadata_snapshot, local_context, plan_for_part);
Expand Down
2 changes: 1 addition & 1 deletion src/Storages/MergeTree/ExportPartTask.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
namespace DB
{

class ExportPartTask : public IExecutableTask
class ExportPartTask : public IExecutableTask, public std::enable_shared_from_this<ExportPartTask>
{
public:
explicit ExportPartTask(
Expand Down
Loading