-
-
Notifications
You must be signed in to change notification settings - Fork 66
Expand file tree
/
Copy pathChildWorkflowStub.php
More file actions
121 lines (102 loc) · 4.19 KB
/
ChildWorkflowStub.php
File metadata and controls
121 lines (102 loc) · 4.19 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
<?php
declare(strict_types=1);
namespace Workflow;
use function React\Promise\all;
use React\Promise\Deferred;
use React\Promise\PromiseInterface;
use function React\Promise\resolve;
use RuntimeException;
use Throwable;
use Workflow\Exceptions\TransitionNotFound;
use Workflow\Serializers\Serializer;
use Workflow\States\WorkflowFailedStatus;
final class ChildWorkflowStub
{
public static function all(iterable $promises): PromiseInterface
{
return all([...$promises]);
}
public static function make($workflow, ...$arguments): PromiseInterface
{
$context = WorkflowStub::getContext();
$log = $context->storedWorkflow->findLogByIndex($context->index);
if (WorkflowStub::faked()) {
$mocks = WorkflowStub::mocks();
if (! $log && array_key_exists($workflow, $mocks)) {
$result = $mocks[$workflow];
$log = $context->storedWorkflow->createLog([
'index' => $context->index,
'now' => $context->now,
'class' => $workflow,
'result' => Serializer::serialize(
is_callable($result) ? $result($context, ...$arguments) : $result
),
]);
WorkflowStub::recordDispatched($workflow, $arguments);
}
}
if ($log) {
$result = Serializer::unserialize($log->result);
if (
is_array($result)
&& array_key_exists('class', $result)
&& is_subclass_of($result['class'], Throwable::class)
) {
if (! $context->replaying) {
$storedChildWorkflow = $context->storedWorkflow->children()
->wherePivot('parent_index', $context->index)
->first();
if ($storedChildWorkflow && $storedChildWorkflow->status::class !== WorkflowFailedStatus::class) {
$log->delete();
$log = null;
}
}
if ($log) {
++$context->index;
WorkflowStub::setContext($context);
try {
$throwable = new $result['class']($result['message'] ?? '', (int) ($result['code'] ?? 0));
} catch (Throwable $throwable) {
throw new RuntimeException(
sprintf('[%s] %s', $result['class'], (string) ($result['message'] ?? '')),
(int) ($result['code'] ?? 0),
$throwable
);
}
throw $throwable;
}
} else {
++$context->index;
WorkflowStub::setContext($context);
return resolve($result);
}
}
if (! $context->replaying) {
$storedChildWorkflow = $context->storedWorkflow->children()
->wherePivot('parent_index', $context->index)
->first();
$childWorkflow = $storedChildWorkflow ? $storedChildWorkflow->toWorkflow() : WorkflowStub::make($workflow);
$hasOptions = collect($arguments)
->contains(static fn ($argument): bool => $argument instanceof WorkflowOptions);
if (! $hasOptions) {
$options = $context->storedWorkflow->workflowOptions();
if ($options->connection !== null || $options->queue !== null) {
$arguments[] = $options;
}
}
if ($childWorkflow->running() && ! $childWorkflow->created()) {
try {
$childWorkflow->resume();
} catch (TransitionNotFound) {
// already running
}
} elseif (! $childWorkflow->completed()) {
$childWorkflow->startAsChild($context->storedWorkflow, $context->index, $context->now, ...$arguments);
}
}
++$context->index;
WorkflowStub::setContext($context);
$deferred = new Deferred();
return $deferred->promise();
}
}