-
Notifications
You must be signed in to change notification settings - Fork 4.6k
Expand file tree
/
Copy pathdapr_kernel_process_context.py
More file actions
92 lines (74 loc) · 3.64 KB
/
dapr_kernel_process_context.py
File metadata and controls
92 lines (74 loc) · 3.64 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
# Copyright (c) Microsoft. All rights reserved.
import uuid
from collections.abc import Sequence
from dapr.actor import ActorId, ActorProxy
from semantic_kernel.processes.dapr_runtime.actors.process_actor import ProcessActor
from semantic_kernel.processes.dapr_runtime.dapr_process_info import DaprProcessInfo
from semantic_kernel.processes.dapr_runtime.interfaces.process_interface import ProcessInterface
from semantic_kernel.processes.kernel_process.kernel_process import KernelProcess
from semantic_kernel.processes.kernel_process.kernel_process_event import KernelProcessEvent
from semantic_kernel.processes.step_utils import DEFAULT_ALLOWED_MODULE_PREFIXES
from semantic_kernel.utils.feature_stage_decorator import experimental
@experimental
class DaprKernelProcessContext:
"""A Dapr kernel process context."""
dapr_process: ProcessInterface
process: KernelProcess
max_supersteps: int = 100
def __init__(
self,
process: KernelProcess,
max_supersteps: int | None = None,
allowed_module_prefixes: Sequence[str] | None = DEFAULT_ALLOWED_MODULE_PREFIXES,
) -> None:
"""Initialize a new instance of DaprKernelProcessContext.
Args:
process: The kernel process to start.
max_supersteps: The maximum number of supersteps. This is the total number of times process steps will run.
Defaults to None, and thus the process will run its steps 100 times.
allowed_module_prefixes: Sequence of module prefixes that are allowed
for step class loading. Defaults to ("semantic_kernel.",). Pass
None to allow any module (not recommended for production).
"""
if process.state.name is None:
raise ValueError("Process state name must not be None")
if process.state.id is None or process.state.id == "":
process.state.id = str(uuid.uuid4().hex)
if max_supersteps is not None:
self.max_supersteps = max_supersteps
self.allowed_module_prefixes = allowed_module_prefixes
self.process = process
process_id = ActorId(process.state.id)
self.dapr_process = ActorProxy.create( # type: ignore
actor_type=f"{ProcessActor.__name__}",
actor_id=process_id,
actor_interface=ProcessInterface,
)
async def start_with_event(self, initial_event: KernelProcessEvent) -> None:
"""Starts the process with the provided initial event."""
dapr_process = DaprProcessInfo.from_kernel_process(self.process)
dapr_process_dict = dapr_process.model_dump_json()
payload = {
"process_info": dapr_process_dict,
"parent_process_id": None,
"max_supersteps": self.max_supersteps,
}
await self.dapr_process.initialize_process(payload)
initial_event_json = initial_event.model_dump_json()
await self.dapr_process.run_once(initial_event_json)
async def send_event(self, event: KernelProcessEvent) -> None:
"""Sends an event to the process."""
await self.dapr_process.send_message(event)
async def stop(self) -> None:
"""Stops the process."""
await self.dapr_process.stop()
async def get_state(self) -> KernelProcess:
"""Retrieves the current state of the process.
Returns:
The current state of the process.
"""
raw_process_info = await self.dapr_process.get_process_info()
dapr_process_info = DaprProcessInfo.model_validate(raw_process_info)
return dapr_process_info.to_kernel_process(
allowed_module_prefixes=self.allowed_module_prefixes,
)