-
Notifications
You must be signed in to change notification settings - Fork 110
Expand file tree
/
Copy pathlocal_workflow_context.rb
More file actions
255 lines (206 loc) · 7.68 KB
/
local_workflow_context.rb
File metadata and controls
255 lines (206 loc) · 7.68 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
require 'securerandom'
require 'temporal/testing/local_activity_context'
require 'temporal/testing/workflow_execution'
require 'temporal/execution_options'
require 'temporal/metadata/activity'
require 'temporal/workflow/future'
require 'temporal/workflow/history/event_target'
require 'temporal/workflow/context_helpers'
module Temporal
module Testing
class LocalWorkflowContext
attr_reader :metadata, :config
def initialize(execution, workflow_id, run_id, disabled_releases, metadata, config = Temporal.config)
@last_event_id = 0
@execution = execution
@run_id = run_id
@workflow_id = workflow_id
@disabled_releases = disabled_releases
@metadata = metadata
@completed = false
@config = config
end
def completed?
@completed
end
def logger
Temporal.logger
end
def headers
metadata.headers
end
def has_release?(change_name)
!disabled_releases.include?(change_name.to_s)
end
def execute_activity(activity_class, *input, **args)
options = args.delete(:options) || {}
input << args unless args.empty?
event_id = next_event_id
activity_id = options[:activity_id] || event_id
target = Workflow::History::EventTarget.new(event_id, Workflow::History::EventTarget::ACTIVITY_TYPE)
future = Workflow::Future.new(target, self, cancelation_id: activity_id)
execution_options = ExecutionOptions.new(activity_class, options, config.default_execution_options)
metadata = Metadata::Activity.new(
namespace: execution_options.namespace,
id: activity_id,
name: execution_options.name,
task_token: nil,
attempt: 1,
workflow_run_id: run_id,
workflow_id: workflow_id,
workflow_name: self.metadata.name,
headers: execution_options.headers,
heartbeat_details: nil,
scheduled_at: Time.now,
current_attempt_scheduled_at: Time.now,
heartbeat_timeout: 0
)
context = LocalActivityContext.new(metadata)
begin
result = activity_class.execute_in_context(context, input)
rescue StandardError => e
# Capture any failure from running the activity into the future
# instead of raising immediately in order to match the behavior of
# running against a Temporal server.
future.fail(e)
else
if context.async?
execution.register_future(context.async_token, future)
else
# Fulfill the future straight away for non-async activities
future.set(result)
end
end
future
end
def execute_activity!(activity_class, *input, **args)
future = execute_activity(activity_class, *input, **args)
result_or_exception = future.get
raise result_or_exception if future.failed?
result_or_exception
end
def execute_local_activity(activity_class, *input, **args)
options = args.delete(:options) || {}
input << args unless args.empty?
execution_options = ExecutionOptions.new(activity_class, options, config.default_execution_options)
activity_id = options[:activity_id] || SecureRandom.uuid
metadata = Metadata::Activity.new(
namespace: execution_options.namespace,
id: activity_id,
name: execution_options.name,
task_token: nil,
attempt: 1,
workflow_run_id: run_id,
workflow_id: workflow_id,
workflow_name: self.metadata.name,
headers: execution_options.headers,
heartbeat_details: nil,
scheduled_at: Time.now,
current_attempt_scheduled_at: Time.now,
heartbeat_timeout: 0
)
context = LocalActivityContext.new(metadata)
activity_class.execute_in_context(context, input)
end
def execute_workflow(workflow_class, *input, **args)
raise NotImplementedError, 'not yet available for testing'
end
def execute_workflow!(workflow_class, *input, **args)
options = args.delete(:options) || {}
input << args unless args.empty?
execution = WorkflowExecution.new
workflow_id = SecureRandom.uuid
run_id = SecureRandom.uuid
execution_options = ExecutionOptions.new(workflow_class, options, config.default_execution_options)
child_metadata = Temporal::Metadata::Workflow.new(
namespace: execution_options.namespace,
id: workflow_id,
name: execution_options.name, # Workflow class name
run_id: run_id,
parent_id: @workflow_id,
parent_run_id: @run_id,
attempt: 1,
task_queue: execution_options.task_queue,
headers: execution_options.headers,
run_started_at: Time.now,
memo: {},
)
context = Temporal::Testing::LocalWorkflowContext.new(
execution, workflow_id, run_id, workflow_class.disabled_releases, child_metadata
)
workflow_class.execute_in_context(context, input)
end
def side_effect(&block)
block.call
end
def sleep(timeout)
::Kernel.sleep timeout
end
def start_timer(timeout, timer_id = nil)
raise NotImplementedError, 'not yet available for testing'
end
def cancel_timer(timer_id)
raise NotImplementedError, 'not yet available for testing'
end
def complete(result = nil)
completed!
result
end
def fail(exception)
completed!
raise exception
end
def continue_as_new(*input, **args)
raise NotImplementedError, 'not yet available for testing'
end
def wait_for_all(*futures)
futures.each(&:wait)
return
end
def wait_for_any(*futures)
return if futures.empty?
Fiber.yield while futures.none?(&:finished?)
return
end
def wait_until(&unblock_condition)
raise 'You must pass an unblock condition block to wait_for' if unblock_condition.nil?
Fiber.yield until unblock_condition.call
return
end
def now
Time.now
end
def on_signal(signal_name = nil, &block)
raise NotImplementedError, 'Signals are not available when Temporal::Testing.local! is on'
end
def on_query(query, &block)
raise NotImplementedError, 'Queries are not available when Temporal::Testing.local! is on'
end
def cancel_activity(activity_id)
raise NotImplementedError, 'Cancel is not available when Temporal::Testing.local! is on'
end
def cancel(target, cancelation_id)
raise NotImplementedError, 'Cancel is not available when Temporal::Testing.local! is on'
end
def signal_external_workflow(workflow, signal, workflow_id, run_id = nil, input = nil, namespace: nil, child_workflow_only: false)
raise NotImplementedError, 'Signals are not available when Temporal::Testing.local! is on'
end
def upsert_search_attributes(search_attributes)
search_attributes = Temporal::Workflow::Context::Helpers.process_search_attributes(search_attributes)
if search_attributes.empty?
raise ArgumentError, "Cannot upsert an empty hash for search_attributes, as this would do nothing."
end
execution.upsert_search_attributes(search_attributes)
end
private
attr_reader :execution, :run_id, :workflow_id, :disabled_releases
def completed!
@completed = true
end
def next_event_id
@last_event_id += 1
@last_event_id
end
end
end
end