-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathquery_task_result.rs
More file actions
108 lines (99 loc) · 3.6 KB
/
query_task_result.rs
File metadata and controls
108 lines (99 loc) · 3.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
use super::error as ex_error;
use super::error::Result;
use super::error_code::ErrorCode;
use super::models::QueryResult;
use super::query_types::ExecutionStatus;
use super::snowflake_error::SnowflakeError;
use snafu::ResultExt;
use state_store::QueryMetric;
use tokio::task::JoinError;
use uuid::Uuid;
// pub type TaskFuture = tokio::task::JoinHandle<std::result::Result<QueryResult, Error>>;
pub struct ExecutionTaskResult {
pub result: Result<QueryResult>,
pub execution_status: ExecutionStatus,
pub error_code: Option<ErrorCode>,
}
impl ExecutionTaskResult {
#[must_use]
pub fn from_query_result(query_id: Uuid, result: Result<QueryResult>) -> Self {
let execution_status = result
.as_ref()
.map_or_else(|_| ExecutionStatus::Fail, |_| ExecutionStatus::Success);
let error_code = match result.as_ref() {
Ok(_) => None,
Err(err) => Some(SnowflakeError::from_executor_error(err).error_code()),
};
// set query execution status to successful or failed
Self {
result: result.context(ex_error::QueryExecutionSnafu { query_id }),
execution_status,
error_code,
}
}
#[must_use]
pub fn from_query_limit_exceeded(query_id: Uuid) -> Self {
Self {
result: ex_error::ConcurrencyLimitSnafu
.fail()
.context(ex_error::QueryExecutionSnafu { query_id }),
execution_status: ExecutionStatus::Incident,
error_code: Some(ErrorCode::LimitExceeded),
}
}
#[must_use]
pub fn from_failed_query_task(query_id: Uuid, task_error: JoinError) -> Self {
Self {
result: Err(task_error)
.context(ex_error::QuerySubtaskJoinSnafu)
.context(ex_error::QueryExecutionSnafu { query_id }),
execution_status: ExecutionStatus::Incident,
error_code: Some(ErrorCode::QueryTask),
}
}
#[must_use]
pub fn from_cancelled_query_task(query_id: Uuid) -> Self {
Self {
result: ex_error::QueryCancelledSnafu { query_id }
.fail()
.context(ex_error::QueryExecutionSnafu { query_id }),
execution_status: ExecutionStatus::Fail,
error_code: Some(ErrorCode::Cancelled),
}
}
#[must_use]
pub fn from_timeout_query_task(query_id: Uuid) -> Self {
Self {
result: ex_error::QueryTimeoutSnafu
.fail()
.context(ex_error::QueryExecutionSnafu { query_id }),
execution_status: ExecutionStatus::Fail,
error_code: Some(ErrorCode::Timeout),
}
}
#[cfg(feature = "state-store-query")]
pub fn assign_query_attributes(&self, query: &mut state_store::Query) {
query.set_execution_status(self.execution_status);
if let Some(error_code) = self.error_code {
query.set_error_code(error_code.to_string());
}
if let Err(err) = &self.result {
query.set_error_message(err.to_string());
}
// Collect result metrics
if let Ok(res) = &self.result {
query.set_query_metrics(
res.metrics
.iter()
.map(|metric| QueryMetric {
node_id: metric.node_id,
parent_node_id: metric.parent_node_id,
operator: metric.operator.clone(),
metrics: metric.metrics.clone(),
})
.collect(),
);
}
query.set_end_time();
}
}