-
Notifications
You must be signed in to change notification settings - Fork 110
Expand file tree
/
Copy pathtask_processor.rb
More file actions
195 lines (159 loc) · 6.98 KB
/
task_processor.rb
File metadata and controls
195 lines (159 loc) · 6.98 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
require 'temporal/error_handler'
require 'temporal/errors'
require 'temporal/metadata'
require 'temporal/workflow/executor'
require 'temporal/workflow/history'
require 'temporal/workflow/stack_trace_tracker'
require 'temporal/metric_keys'
module Temporal
class Workflow
class TaskProcessor
Query = Struct.new(:query) do
include Concerns::Payloads
def query_type
query.query_type
end
def query_args
from_query_payloads(query.query_args)
end
end
MAX_FAILED_ATTEMPTS = 1
LEGACY_QUERY_KEY = :legacy_query
def initialize(task, namespace, workflow_lookup, middleware_chain, config, binary_checksum)
@task = task
@namespace = namespace
@metadata = Metadata.generate_workflow_task_metadata(task, namespace)
@task_token = task.task_token
@workflow_name = task.workflow_type.name
@workflow_class = workflow_lookup.find(workflow_name)
@middleware_chain = middleware_chain
@config = config
@binary_checksum = binary_checksum
end
def process
start_time = Time.now
Temporal.logger.debug("Processing Workflow task", metadata.to_h)
Temporal.metrics.timing(Temporal::MetricKeys::WORKFLOW_TASK_QUEUE_TIME, queue_time_ms, workflow: workflow_name, namespace: namespace)
if !workflow_class
raise Temporal::WorkflowNotRegistered, 'Workflow is not registered with this worker'
end
history = fetch_full_history
queries = parse_queries
# We only need to track the stack trace if this is a stack trace query
track_stack_trace = queries.values.map(&:query_type).include?(StackTraceTracker::STACK_TRACE_QUERY_NAME)
# TODO: For sticky workflows we need to cache the Executor instance
executor = Workflow::Executor.new(workflow_class, history, metadata, config, track_stack_trace)
commands = middleware_chain.invoke(metadata) do
executor.run
end
query_results = executor.process_queries(queries)
if legacy_query_task?
complete_query(query_results[LEGACY_QUERY_KEY])
else
complete_task(commands, query_results)
end
rescue StandardError => error
Temporal::ErrorHandler.handle(error, config, metadata: metadata)
fail_task(error)
ensure
time_diff_ms = ((Time.now - start_time) * 1000).round
Temporal.metrics.timing(Temporal::MetricKeys::WORKFLOW_TASK_LATENCY, time_diff_ms, workflow: workflow_name, namespace: namespace)
Temporal.logger.debug("Workflow task processed", metadata.to_h.merge(execution_time: time_diff_ms))
end
private
attr_reader :task, :namespace, :task_token, :workflow_name, :workflow_class,
:middleware_chain, :metadata, :config, :binary_checksum
def connection
@connection ||= Temporal::Connection.generate(config.for_connection)
end
def queue_time_ms
scheduled = task.scheduled_time.to_f
started = task.started_time.to_f
((started - scheduled) * 1_000).round
end
def fetch_full_history
events = task.history.events.to_a
next_page_token = task.next_page_token
while !next_page_token.empty? do
response = connection.get_workflow_execution_history(
namespace: namespace,
workflow_id: task.workflow_execution.workflow_id,
run_id: task.workflow_execution.run_id,
next_page_token: next_page_token
)
if response.history.events.empty?
raise Temporal::UnexpectedResponse, 'Received empty history page'
end
events += response.history.events.to_a
next_page_token = response.next_page_token
end
Workflow::History.new(events)
end
def legacy_query_task?
!!task.query
end
def parse_queries
# Support for deprecated query style
if legacy_query_task?
{ LEGACY_QUERY_KEY => Query.new(task.query) }
else
task.queries.each_with_object({}) do |(query_id, query), result|
result[query_id] = Query.new(query)
end
end
end
def complete_task(commands, query_results)
Temporal.logger.info("Workflow task completed", metadata.to_h)
connection.respond_workflow_task_completed(
namespace: namespace,
task_token: task_token,
commands: commands,
binary_checksum: binary_checksum,
query_results: query_results
)
rescue StandardError => error
# We rescue the error here to avoid failing the task in the process
# function above. One common cause of errors here is if the current
# workflow task is invalidated by a concurrent signal arriving while it
# tries to complete the workflow. In this case we do not need to and
# should not fail the workflow task.
#
# Not failing the workflow task will still result it being retried after
# a delay which is the behavior we'd want in cases like the above but
# also for ephemeral issues like network outages.
Temporal.logger.error("Unable to complete the workflow task", metadata.to_h.merge(error: error.inspect))
Temporal::ErrorHandler.handle(error, config, metadata: metadata)
end
def complete_query(result)
Temporal.logger.info("Workflow Query task completed", metadata.to_h)
connection.respond_query_task_completed(
namespace: namespace,
task_token: task_token,
query_result: result
)
rescue StandardError => error
Temporal.logger.error("Unable to complete a query", metadata.to_h.merge(error: error.inspect))
Temporal::ErrorHandler.handle(error, config, metadata: metadata)
end
def fail_task(error)
Temporal.metrics.increment(Temporal::MetricKeys::WORKFLOW_TASK_EXECUTION_FAILED, workflow: workflow_name, namespace: namespace)
Temporal.logger.error('Workflow task failed', metadata.to_h.merge(error: error.inspect))
Temporal.logger.debug(error.backtrace.join("\n"))
# Only fail the workflow task on the first attempt. Subsequent failures of the same workflow task
# should timeout. This is to avoid spinning on the failed workflow task as the service doesn't
# yet exponentially backoff on retries.
return if task.attempt > MAX_FAILED_ATTEMPTS
connection.respond_workflow_task_failed(
namespace: namespace,
task_token: task_token,
cause: Temporal::Api::Enums::V1::WorkflowTaskFailedCause::WORKFLOW_TASK_FAILED_CAUSE_WORKFLOW_WORKER_UNHANDLED_FAILURE,
exception: error,
binary_checksum: binary_checksum
)
rescue StandardError => error
Temporal.logger.error("Unable to fail Workflow task", metadata.to_h.merge(error: error.inspect))
Temporal::ErrorHandler.handle(error, config, metadata: metadata)
end
end
end
end