-
Notifications
You must be signed in to change notification settings - Fork 15
Expand file tree
/
Copy pathevent_handler.py
More file actions
75 lines (58 loc) · 2.74 KB
/
event_handler.py
File metadata and controls
75 lines (58 loc) · 2.74 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
import logging
from SpiffWorkflow.spiff.parser import SpiffBpmnParser
from SpiffWorkflow.spiff.specs.defaults import UserTask, ManualTask, ServiceTask
from SpiffWorkflow.spiff.serializer.config import SPIFF_CONFIG
from SpiffWorkflow.bpmn.specs.mixins.none_task import NoneTask
from SpiffWorkflow.bpmn.script_engine import TaskDataEnvironment
from SpiffWorkflow.spiff.specs.event_definitions import ErrorEventDefinition
from SpiffWorkflow.spiff.parser.task_spec import ServiceTaskParser
from SpiffWorkflow.bpmn.parser.util import full_tag
from SpiffWorkflow.bpmn.exceptions import WorkflowTaskException
from SpiffWorkflow.bpmn import BpmnEvent
from ..serializer.file import FileSerializer
from ..engine import BpmnEngine, Instance
from .curses_handlers import UserTaskHandler, ManualTaskHandler
logger = logging.getLogger('spiff_engine')
logger.setLevel(logging.INFO)
spiff_logger = logging.getLogger('spiff')
spiff_logger.setLevel(logging.INFO)
dirname = 'data'
FileSerializer.initialize(dirname)
handlers = {
UserTask: UserTaskHandler,
ManualTask: ManualTaskHandler,
NoneTask: ManualTaskHandler,
}
class EventHandlingServiceTask(ServiceTask):
def _execute(self, my_task):
script_engine = my_task.workflow.script_engine
# The param also has a type, but I don't need it
params = dict((name, script_engine.evaluate(my_task, p['value'])) for name, p in self.operation_params.items())
try:
result = script_engine.call_service(
my_task,
operation_name=self.operation_name,
operation_params=params
)
my_task.data[self.result_variable] = result
return True
except FileNotFoundError as exc:
event_definition = ErrorEventDefinition('file_not_found', code='1')
event = BpmnEvent(event_definition, payload=params['filename'])
my_task.workflow.top_workflow.catch(event)
return False
except Exception as exc:
raise WorkflowTaskException('Service Task execution error', task=my_task, exception=exc)
class ServiceTaskEnvironment(TaskDataEnvironment):
def call_service(self, context, operation_name, operation_params):
if operation_name == 'read_file':
return open(operation_params['filename']).read()
else:
raise ValueError('Unknown Service')
parser = SpiffBpmnParser()
parser.OVERRIDE_PARSER_CLASSES[full_tag('serviceTask')] = (ServiceTaskParser, EventHandlingServiceTask)
SPIFF_CONFIG[EventHandlingServiceTask] = SPIFF_CONFIG.pop(ServiceTask)
registry = FileSerializer.configure(SPIFF_CONFIG)
serializer = FileSerializer(dirname, registry=registry)
script_env = ServiceTaskEnvironment()
engine = BpmnEngine(parser, serializer, script_env)