-
Notifications
You must be signed in to change notification settings - Fork 323
Expand file tree
/
Copy pathcli.py
More file actions
423 lines (364 loc) · 14.3 KB
/
cli.py
File metadata and controls
423 lines (364 loc) · 14.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
import asyncio
import json
import os
import tempfile # For temporary file handling
from pathlib import Path
import typer
from fastapi import FastAPI
from workflow_use.builder.service import BuilderService
from workflow_use.recorder.service import RecordingService
# --- Gemini API Key support ---
GEMINI_API_KEY = os.environ.get("GEMINI_API_KEY", "YOURGEMINIAPIKEY")
# --- FastAPI app for orchestration ---
fastapi_app = FastAPI()
try:
from workflow_use.orchestrator.mcp_orchestrator import router as mcp_router
fastapi_app.include_router(mcp_router, prefix="/api/enterprise")
except ImportError:
pass
# Default LLM instance to None
llm_instance = None
OPENAI_API_KEY = os.environ.get('OPENAI_API_KEY')
if OPENAI_API_KEY:
try:
from langchain_openai import ChatOpenAI
llm_instance = ChatOpenAI(model='gpt-4o')
except Exception as e:
typer.secho(f'Error initializing OpenAI LLM: {e}', fg=typer.colors.RED)
else:
# Use Gemini as default LLM if OpenAI key is not set
class GeminiLLM:
def __init__(self, api_key):
self.api_key = api_key
self.model = "gemini-2.5-flash-preview-05-20"
def run(self, prompt):
import requests
url = f"https://generativelanguage.googleapis.com/v1beta/models/{self.model}:generateContent?key={self.api_key}"
payload = {"contents": [{"parts": [{"text": prompt}]}]}
headers = {"Content-Type": "application/json"}
resp = requests.post(url, json=payload, headers=headers)
return resp.json()
def with_structured_output(self, *args, **kwargs):
# For compatibility with BuilderService, just return self
return self
llm_instance = GeminiLLM(GEMINI_API_KEY)
builder_service = BuilderService(llm=llm_instance) if llm_instance else None
# recorder_service = RecorderService() # Placeholder
recording_service = (
RecordingService()
) # Assuming RecordingService does not need LLM, or handle its potential None state if it does.
def get_default_save_dir() -> Path:
"""Returns the default save directory for workflows."""
# Ensure ./tmp exists for temporary files as well if we use it
tmp_dir = Path('./tmp').resolve()
tmp_dir.mkdir(parents=True, exist_ok=True)
return tmp_dir
# --- Helper function for building and saving workflow ---
def _build_and_save_workflow_from_recording(
recording_path: Path,
default_save_dir: Path,
is_temp_recording: bool = False, # To adjust messages if it's from a live recording
) -> Path | None:
"""Builds a workflow from a recording file, prompts for details, and saves it."""
if not builder_service:
typer.secho(
'BuilderService not initialized. Cannot build workflow.',
fg=typer.colors.RED,
)
return None
prompt_subject = 'recorded' if is_temp_recording else 'provided'
typer.echo() # Add space
description: str = typer.prompt(typer.style(f'What is the purpose of this {prompt_subject} workflow?', bold=True))
typer.echo() # Add space
output_dir_str: str = typer.prompt(
typer.style('Where would you like to save the final built workflow?', bold=True)
+ f" (e.g., ./my_workflows, press Enter for '{default_save_dir}')",
default=str(default_save_dir),
)
output_dir = Path(output_dir_str).resolve()
output_dir.mkdir(parents=True, exist_ok=True)
typer.echo(f'The final built workflow will be saved in: {typer.style(str(output_dir), fg=typer.colors.CYAN)}')
typer.echo() # Add space
typer.echo(
f'Processing recording ({typer.style(str(recording_path.name), fg=typer.colors.MAGENTA)}) and building workflow...'
)
try:
workflow_definition = asyncio.run(
builder_service.build_workflow_from_path(
recording_path,
description,
)
)
except FileNotFoundError:
typer.secho(
f'Error: Recording file not found at {recording_path}. Please ensure it exists.',
fg=typer.colors.RED,
)
return None
except Exception as e:
typer.secho(f'Error building workflow: {e}', fg=typer.colors.RED)
return None
if not workflow_definition:
typer.secho(
f'Failed to build workflow definition from the {prompt_subject} recording.',
fg=typer.colors.RED,
)
return None
typer.secho('Workflow built successfully!', fg=typer.colors.GREEN, bold=True)
typer.echo() # Add space
file_stem = recording_path.stem
if is_temp_recording:
file_stem = file_stem.replace('temp_recording_', '') or 'recorded'
default_workflow_filename = f'{file_stem}.workflow.json'
workflow_output_name: str = typer.prompt(
typer.style('Enter a name for the generated workflow file', bold=True) + ' (e.g., my_search.workflow.json):',
default=default_workflow_filename,
)
# Ensure the file name ends with .json
if not workflow_output_name.endswith('.json'):
workflow_output_name = f"{workflow_output_name}.json"
final_workflow_path = output_dir / workflow_output_name
try:
asyncio.run(builder_service.save_workflow_to_path(workflow_definition, final_workflow_path))
typer.secho(
f'Final workflow definition saved to: {typer.style(str(final_workflow_path.resolve()), fg=typer.colors.BRIGHT_GREEN, bold=True)}',
fg=typer.colors.GREEN, # Overall message color
)
return final_workflow_path
except Exception as e:
typer.secho(f'Error saving workflow: {e}', fg=typer.colors.RED)
return None
cli = typer.Typer(
name='workflow-cli',
help='A CLI tool to create and run workflows.',
add_completion=False,
no_args_is_help=True,
)
@cli.command(
name='create-workflow',
help='Records a new browser interaction and then builds a workflow definition.',
)
def create_workflow():
"""
Guides the user through recording browser actions, then uses the helper
to build and save the workflow definition.
"""
if not recording_service:
# Adjusted RecordingService initialization check assuming it doesn't need LLM
# If it does, this check should be more robust (e.g. based on llm_instance)
typer.secho(
'RecordingService not available. Cannot create workflow.',
fg=typer.colors.RED,
)
raise typer.Exit(code=1)
default_tmp_dir = get_default_save_dir() # Ensures ./tmp exists for temporary files
typer.echo(typer.style('Starting interactive browser recording session...', bold=True))
typer.echo('Please follow instructions in the browser. Close the browser or follow prompts to stop recording.')
typer.echo() # Add space
temp_recording_path = None
try:
captured_recording_model = asyncio.run(recording_service.capture_workflow())
if not captured_recording_model:
typer.secho(
'Recording session ended, but no workflow data was captured.',
fg=typer.colors.YELLOW,
)
raise typer.Exit(code=1)
typer.secho('Recording captured successfully!', fg=typer.colors.GREEN, bold=True)
typer.echo() # Add space
with tempfile.NamedTemporaryFile(
mode='w',
suffix='.json',
prefix='temp_recording_',
delete=False,
dir=default_tmp_dir,
encoding='utf-8',
) as tmp_file:
try:
tmp_file.write(captured_recording_model.model_dump_json(indent=2))
except AttributeError:
json.dump(captured_recording_model, tmp_file, indent=2)
temp_recording_path = Path(tmp_file.name)
# Use the helper function to build and save
saved_path = _build_and_save_workflow_from_recording(temp_recording_path, default_tmp_dir, is_temp_recording=True)
if not saved_path:
typer.secho(
'Failed to complete workflow creation after recording.',
fg=typer.colors.RED,
)
raise typer.Exit(code=1)
except Exception as e:
typer.secho(f'An error occurred during workflow creation: {e}', fg=typer.colors.RED)
raise typer.Exit(code=1)
@cli.command(
name='build-from-recording',
help='Builds a workflow definition from an existing recording JSON file.',
)
def build_from_recording_command(
recording_path: Path = typer.Argument(
...,
exists=True,
file_okay=True,
dir_okay=False,
readable=True,
resolve_path=True,
help='Path to the existing recording JSON file.',
),
):
"""
Takes a path to a recording JSON file, prompts for workflow details,
builds the workflow using BuilderService, and saves it.
"""
default_save_dir = get_default_save_dir()
typer.echo(
typer.style(
f'Building workflow from provided recording: {typer.style(str(recording_path.resolve()), fg=typer.colors.MAGENTA)}',
bold=True,
)
)
typer.echo() # Add space
saved_path = _build_and_save_workflow_from_recording(recording_path, default_save_dir, is_temp_recording=False)
if not saved_path:
typer.secho(f'Failed to build workflow from {recording_path.name}.', fg=typer.colors.RED)
raise typer.Exit(code=1)
@cli.command(
name='run-as-tool',
help='Runs an existing workflow and automatically parse the required variables from prompt.',
)
def run_as_tool_command(
workflow_path: Path = typer.Argument(
...,
exists=True,
file_okay=True,
dir_okay=False,
readable=True,
help='Path to the .workflow.json file.',
show_default=False,
),
prompt: str = typer.Option(
...,
'--prompt',
'-p',
help='Prompt for the LLM to reason about and execute the workflow.',
prompt=True, # Prompts interactively if not provided
),
):
"""
Run the workflow and automatically parse the required variables from the input/prompt that the user provides.
"""
if not llm_instance:
typer.secho(
'LLM not initialized. Please check your OpenAI API key. Cannot run as tool.',
fg=typer.colors.RED,
)
raise typer.Exit(code=1)
typer.echo(
typer.style(f'Loading workflow from: {typer.style(str(workflow_path.resolve()), fg=typer.colors.MAGENTA)}', bold=True)
)
typer.echo() # Add space
try:
# Pass llm_instance to ensure the workflow can use it if needed for as_tool() or run_with_prompt()
workflow_obj = Workflow.load_from_file(str(workflow_path), llm=llm_instance)
except Exception as e:
typer.secho(f'Error loading workflow: {e}', fg=typer.colors.RED)
raise typer.Exit(code=1)
typer.secho('Workflow loaded successfully.', fg=typer.colors.GREEN, bold=True)
typer.echo() # Add space
typer.echo(typer.style(f'Running workflow as tool with prompt: "{prompt}"', bold=True))
try:
result = asyncio.run(workflow_obj.run_as_tool(prompt))
typer.secho('\nWorkflow execution completed!', fg=typer.colors.GREEN, bold=True)
typer.echo(typer.style('Result:', bold=True))
# Ensure result is JSON serializable for consistent output
try:
typer.echo(json.dumps(json.loads(result), indent=2)) # Assuming result from run_with_prompt is a JSON string
except (json.JSONDecodeError, TypeError):
typer.echo(result) # Fallback to string if not a JSON string or not serializable
except Exception as e:
typer.secho(f'Error running workflow as tool: {e}', fg=typer.colors.RED)
raise typer.Exit(code=1)
@cli.command(name='run-workflow', help='Runs an existing workflow from a JSON file.')
def run_workflow_command(
workflow_path: Path = typer.Argument(
...,
exists=True,
file_okay=True,
dir_okay=False,
readable=True,
help='Path to the .workflow.json file.',
show_default=False,
),
):
"""
Loads and executes a workflow, prompting the user for required inputs.
"""
typer.echo(
typer.style(f'Loading workflow from: {typer.style(str(workflow_path.resolve()), fg=typer.colors.MAGENTA)}', bold=True)
)
typer.echo() # Add space
try:
# Instantiate Browser and WorkflowController for the Workflow instance
# Pass llm_instance for potential agent fallbacks or agentic steps
browser_instance = Browser() # Add any necessary config if required
controller_instance = WorkflowController() # Add any necessary config if required
workflow_obj = Workflow.load_from_file(
str(workflow_path), llm=llm_instance, browser=browser_instance, controller=controller_instance
)
except Exception as e:
typer.secho(f'Error loading workflow: {e}', fg=typer.colors.RED)
raise typer.Exit(code=1)
typer.secho('Workflow loaded successfully.', fg=typer.colors.GREEN, bold=True)
inputs = {}
input_definitions = workflow_obj.inputs_def # Access inputs_def from the Workflow instance
if input_definitions: # Check if the list is not empty
typer.echo() # Add space
typer.echo(typer.style('Provide values for the following workflow inputs:', bold=True))
typer.echo() # Add space
for input_def in input_definitions:
var_name_styled = typer.style(input_def.name, fg=typer.colors.CYAN, bold=True)
prompt_question = typer.style(f'Enter value for {var_name_styled}', bold=True)
var_type = input_def.type.lower() # type is a direct attribute
is_required = input_def.required
type_info_str = f'type: {var_type}'
if is_required:
status_str = typer.style('required', fg=typer.colors.RED)
else:
status_str = typer.style('optional', fg=typer.colors.YELLOW)
full_prompt_text = f'{prompt_question} ({status_str}, {type_info_str})'
input_val = None
if var_type == 'bool':
input_val = typer.confirm(full_prompt_text)
elif var_type == 'number':
input_val = typer.prompt(full_prompt_text, type=float)
elif var_type == 'string': # Default to string for other unknown types as well
input_val = typer.prompt(full_prompt_text, type=str)
else: # Should ideally not happen if schema is validated, but good to have a fallback
typer.secho(
f"Warning: Unknown type '{var_type}' for variable '{input_def.name}'. Treating as string.",
fg=typer.colors.YELLOW,
)
input_val = typer.prompt(full_prompt_text, type=str)
inputs[input_def.name] = input_val
typer.echo() # Add space after each prompt
else:
typer.echo('No input schema found in the workflow, or no properties defined. Proceeding without inputs.')
typer.echo() # Add space
typer.echo(typer.style('Running workflow...', bold=True))
try:
# Call run on the Workflow instance
# close_browser_at_end=True is the default for Workflow.run, but explicit for clarity
result = asyncio.run(workflow_obj.run(inputs=inputs, close_browser_at_end=True))
typer.secho('\nWorkflow execution completed!', fg=typer.colors.GREEN, bold=True)
typer.echo(typer.style('Result:', bold=True))
# Output the number of steps executed, similar to previous behavior
typer.echo(f'{typer.style(str(len(result)), bold=True)} steps executed.')
# For more detailed results, one might want to iterate through the 'result' list
# and print each item, or serialize the whole list to JSON.
# For now, sticking to the step count as per original output.
except Exception as e:
typer.secho(f'Error running workflow: {e}', fg=typer.colors.RED)
raise typer.Exit(code=1)
if __name__ == "__main__":
cli()
# Expose FastAPI app for uvicorn
app = fastapi_app