-
Notifications
You must be signed in to change notification settings - Fork 323
Expand file tree
/
Copy pathservice.py
More file actions
341 lines (303 loc) · 14.6 KB
/
service.py
File metadata and controls
341 lines (303 loc) · 14.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
import base64
import json
import logging
import re
from pathlib import Path
from typing import Any, Dict, List, Optional, cast
from langchain_core.exceptions import OutputParserException
from langchain_core.language_models import BaseChatModel
from langchain_core.messages import HumanMessage
from langchain_core.prompts import PromptTemplate
from pydantic import ValidationError
from workflow_use.builder.prompts import WORKFLOW_BUILDER_PROMPT_TEMPLATE
from workflow_use.controller.service import WorkflowController
from workflow_use.schema.views import WorkflowDefinitionSchema
logger = logging.getLogger(__name__)
class BuilderService:
"""
Service responsible for building executable workflow JSON definitions
from recorded browser session events using an LLM.
"""
def __init__(self, llm: BaseChatModel):
"""
Initializes the BuilderService.
Args:
llm: A LangChain BaseChatModel instance configured for use.
It should ideally support vision capabilities if screenshots are used.
"""
if llm is None:
raise ValueError("A BaseChatModel instance must be provided.")
# Configure the LLM to return structured output based on the Pydantic model
try:
# Specify method="function_calling" for better compatibility
self.llm_structured = llm.with_structured_output(
WorkflowDefinitionSchema, method="function_calling"
)
except NotImplementedError:
logger.warning(
"LLM does not support structured output natively. Falling back."
)
# Basic LLM call if structured output is not supported
# Output parsing will be handled manually later
self.llm_structured = llm # Store the original llm
self.prompt_template = PromptTemplate.from_template(
WORKFLOW_BUILDER_PROMPT_TEMPLATE
)
self.actions_markdown = self._get_available_actions_markdown()
logger.info("BuilderService initialized.")
def _get_available_actions_markdown(self) -> str:
"""Return a markdown list of available actions and their schema."""
controller = WorkflowController()
lines: List[str] = []
for action in controller.registry.registry.actions.values():
# Only include deterministic actions relevant for building from recordings
# Exclude agent-specific or meta-actions if necessary
# Based on schema/views.py, the recorder types seem to map directly
# to controller action *names*, but the prompt uses the event `type` field.
# Let's assume the prompt template correctly lists the *event types* expected.
# This function provides the detailed schema for the LLM.
schema_info = action.param_model.model_json_schema()
# Simplify schema representation for the prompt if too verbose
param_details = []
props = schema_info.get("properties", {})
required = schema_info.get("required", [])
for name, details in props.items():
req_star = "*" if name in required else ""
param_details.append(
f"`{name}`{req_star} ({details.get('type', 'any')})"
)
lines.append(
f"- **`{action.name}`**: {action.description}"
) # Using action name from controller
if param_details:
lines.append(f" - Parameters: {', '.join(param_details)}")
# Add descriptions for agent/extract_content types manually if not in controller
if "agent" not in [
a.name for a in controller.registry.registry.actions.values()
]:
lines.append("- **`agent`**: Executes a task using an autonomous agent.")
lines.append(
" - Parameters: `task`* (string), `description` (string), `max_steps` (integer)"
)
# if "extract_content" not in [
# a.name for a in controller.registry.registry.actions.values()
# ]:
# lines.append(
# "- **`extract_content`**: Uses an LLM to extract specific information from the current page."
# )
# lines.append(
# " - Parameters: `goal`* (string), `description` (string), `should_strip_link_urls` (boolean)"
# )
logger.debug(f"Generated actions markdown:\n{lines}")
return "\n".join(lines)
@staticmethod
def _find_first_user_interaction_url(events: List[Dict[str, Any]]) -> Optional[str]:
"""Finds the URL of the first recorded user interaction."""
return next(
(
evt.get("frameUrl")
for evt in events
if evt.get("type")
in [
"input",
"click",
"scroll",
"select_change",
"key_press",
] # Added more types
),
None,
)
def _parse_llm_output_to_workflow(
self, llm_content: str
) -> WorkflowDefinitionSchema:
"""Attempts to parse the LLM string output into a WorkflowDefinitionSchema."""
logger.debug(f"Raw LLM Output:\n{llm_content}")
content_to_parse = llm_content
# Heuristic cleanup: Extract JSON from markdown code blocks
if "```json" in content_to_parse:
match = re.search(
r"```json\s*([\s\S]*?)\s*```", content_to_parse, re.DOTALL
)
if match:
content_to_parse = match.group(1).strip()
logger.debug("Extracted JSON from ```json block.")
elif content_to_parse.strip().startswith(
"{"
) and content_to_parse.strip().endswith("}"):
# Assume it's already JSON if it looks like it
content_to_parse = content_to_parse.strip()
logger.debug("Assuming raw output is JSON.")
else:
logger.warning(
"Could not reliably extract JSON from LLM output, attempting parse anyway."
)
try:
# Try parsing directly first (might work with structured output)
workflow_data = WorkflowDefinitionSchema.model_validate_json(
content_to_parse
)
logger.info("Successfully parsed LLM output into WorkflowDefinitionSchema.")
return workflow_data
except (json.JSONDecodeError, ValidationError) as e:
logger.error(
f"Failed to parse LLM output into WorkflowDefinitionSchema: {e}"
)
logger.debug(f"Content attempted parsing:\n{content_to_parse}")
raise ValueError(
f"LLM output could not be parsed into a valid Workflow schema. Error: {e}"
) from e
async def build_workflow(
self,
input_workflow: WorkflowDefinitionSchema,
user_goal: str,
use_screenshots: bool = False,
max_images: int = 20,
) -> WorkflowDefinitionSchema:
"""
Generates an enhanced Workflow definition from an input workflow object using an LLM.
Args:
input_workflow: The initial WorkflowDefinitionSchema object containing steps to process.
user_goal: Optional high-level description of the workflow's purpose.
If None, the user might be prompted interactively.
use_screenshots: Whether to include screenshots as visual context for the LLM (if available in steps).
max_images: Maximum number of screenshots to include (to manage cost/tokens).
Returns:
A new WorkflowDefinitionSchema object generated by the LLM.
Raises:
ValueError: If the input workflow is invalid or the LLM output cannot be parsed.
Exception: For other LLM or processing errors.
"""
# Validate input slightly
if not input_workflow or not isinstance(input_workflow.steps, list):
raise ValueError("Invalid input_workflow object provided.")
# Handle user goal
goal = user_goal
if goal is None:
try:
goal = input(
"Please describe the high-level task for the workflow (optional, press Enter to skip): "
).strip()
except EOFError: # Handle non-interactive environments
goal = ""
goal = goal or "Automate the recorded browser actions." # Default goal if empty
# Format the main instruction prompt
prompt_str = self.prompt_template.format(
actions=self.actions_markdown,
goal=goal,
)
# Prepare the vision messages list
vision_messages: List[Dict[str, Any]] = [{"type": "text", "text": prompt_str}]
# Integrate message preparation logic here
images_used = 0
for step in input_workflow.steps:
step_messages: List[Dict[str, Any]] = [] # Messages for this specific step
# 1. Text representation (JSON dump)
step_dict = step.model_dump(mode="json", exclude_none=True)
screenshot_data = step_dict.pop(
"screenshot", None
) # Pop potential screenshot
step_messages.append(
{"type": "text", "text": json.dumps(step_dict, indent=2)}
)
# 2. Optional screenshot
attach_image = use_screenshots and images_used < max_images
step_type = getattr(step, "type", step_dict.get("type"))
if attach_image and step_type != "input": # Don't attach for inputs
# Re-retrieve screenshot data if it wasn't popped (e.g., nested under 'data')
# This assumes screenshot might still be in the original step model or dict
# A bit redundant, ideally screenshot handling is consistent
screenshot = (
screenshot_data
or getattr(step, "screenshot", None)
or step_dict.get("data", {}).get("screenshot")
)
if screenshot:
if isinstance(screenshot, str) and screenshot.startswith("data:"):
screenshot = screenshot.split(",", 1)[-1]
# Validate base64 payload
try:
base64.b64decode(cast(str, screenshot), validate=True)
meta = f"<Screenshot for event type '{step_type}'>"
step_messages.append({"type": "text", "text": meta})
step_messages.append(
{
"type": "image_url",
"image_url": {
"url": f"data:image/png;base64,{screenshot}"
},
}
)
images_used += (
1 # Increment image count *only* if successfully added
)
except (TypeError, ValueError, Exception) as e:
logger.warning(
f"Invalid or missing screenshot for step type '{step_type}' "
f"@ {step_dict.get('timestamp', '')}. Error: {e}"
)
# Don't add image messages if invalid
# Add the messages for this step to the main list
vision_messages.extend(step_messages)
logger.info(
f"Prepared {len(vision_messages)} total message parts, including {images_used} images."
)
# Invoke the LLM (structured output preferred)
try:
# Invoke the LLM (structured output preferred)
# Need to handle cases where structured output isn't truly supported
if hasattr(
self.llm_structured, "output_schema"
): # Check if it seems like structured output model
llm_response = await self.llm_structured.ainvoke(
[HumanMessage(content=cast(Any, vision_messages))]
)
# If structured output worked, llm_response is the Pydantic object
if isinstance(llm_response, WorkflowDefinitionSchema):
workflow_data = llm_response
else:
# It might have returned a message or dict, try parsing its content
content = getattr(llm_response, "content", str(llm_response))
workflow_data = self._parse_llm_output_to_workflow(str(content))
else:
# Fallback to basic LLM call and manual parsing
llm_response = await self.llm_structured.ainvoke(
[HumanMessage(content=cast(Any, vision_messages))]
)
llm_content = str(
getattr(llm_response, "content", llm_response)
) # Get string content
workflow_data = self._parse_llm_output_to_workflow(llm_content)
except OutputParserException as ope:
logger.error(f"LLM output parsing failed (OutputParserException): {ope}")
# Try to parse the raw output as a fallback
raw_output = getattr(ope, "llm_output", str(ope))
logger.info("Attempting to parse raw output as fallback...")
try:
workflow_data = self._parse_llm_output_to_workflow(raw_output)
except ValueError as ve_fallback:
raise ValueError(
f"LLM structured output failed, and fallback parsing also failed. Error: {ve_fallback}"
) from ve_fallback
except Exception as e:
logger.exception(
f"An error occurred during LLM invocation or processing: {e}"
)
raise # Re-raise other unexpected errors
# Return the workflow data object directly
return workflow_data
# path handlers
async def build_workflow_from_path(
self, path: Path, user_goal: str
) -> WorkflowDefinitionSchema:
"""Build a workflow from a JSON file path."""
with open(path, "r", encoding="utf-8") as f:
workflow_data = json.load(f)
workflow_data_schema = WorkflowDefinitionSchema.model_validate(workflow_data)
return await self.build_workflow(workflow_data_schema, user_goal)
async def save_workflow_to_path(
self, workflow: WorkflowDefinitionSchema, path: Path
):
"""Save a workflow to a JSON file path."""
with open(path, "w") as f:
json.dump(workflow.model_dump(mode="json"), f, indent=2)