-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathworkflow_manager.py
More file actions
226 lines (200 loc) · 9.78 KB
/
workflow_manager.py
File metadata and controls
226 lines (200 loc) · 9.78 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
"""Workflow manager: orchestrates the HR automation pipeline."""
import logging
from typing import Any, Optional, Protocol
from task import Task, TaskStatus, Rubric
from planner_agent import PlannerAgent
from screening_agent import ScreeningAgent
from interview_agent import InterviewAgent
from evaluation_agent import EvaluationAgent
from mock_execution_agent import MockExecutionAgent
from verification_agent import VerificationAgent
from log_utils import log_step
logger = logging.getLogger(__name__)
BANNER = (
"==================================================\n"
"NovaOps Agent - AI Recruiting Workflow Automation\n"
"=================================================="
)
# Canonical result keys for consistent JSON output.
RESULT_CANDIDATE = "candidate"
RESULT_OVERALL_SCORE = "overall_score"
RESULT_RECOMMENDATION = "recommendation"
RESULT_STATUS = "status"
RESULT_ERROR = "error"
RESULT_CATEGORY_SCORES = "category_scores"
def result_to_json(task: Task) -> dict[str, Any]:
"""
Return consistent final output. Only evaluation_agent drives recommendation.
Status normalized to "completed" when complete. Safe defaults if pipeline failed.
"""
evaluation = task.evaluation or {}
candidate_name = None
if isinstance(task.candidate_data, dict):
candidate_name = task.candidate_data.get("name")
status = task.status.value
if status == "complete":
status = "completed"
# Final recommendation comes only from evaluation_agent
recommendation = evaluation.get("recommendation")
if recommendation is None and task.error:
recommendation = "Reject"
out: dict[str, Any] = {
RESULT_CANDIDATE: candidate_name,
RESULT_OVERALL_SCORE: evaluation.get("overall_score"),
RESULT_RECOMMENDATION: recommendation,
RESULT_STATUS: status,
RESULT_ERROR: task.error,
}
if "category_scores" in evaluation and evaluation["category_scores"]:
out[RESULT_CATEGORY_SCORES] = evaluation["category_scores"]
return out
class ExecutionAgentProtocol(Protocol):
"""Protocol for execution agents. Allows swapping MockExecutionAgent for NovaActExecutionAgent."""
def execute(self, task: Task) -> Task:
...
class WorkflowManager:
"""
Orchestrates the full pipeline: plan → interview → evaluate → execute → verify.
Controls status transitions and handles errors. No agent calls another agent.
"""
def __init__(
self,
execution_agent: Optional[ExecutionAgentProtocol] = None,
) -> None:
self._planner = PlannerAgent()
self._screening_agent = ScreeningAgent()
self._interview_agent = InterviewAgent()
self._evaluation_agent = EvaluationAgent()
self._execution_agent = execution_agent if execution_agent is not None else MockExecutionAgent()
self._verification_agent = VerificationAgent()
def _ensure_status(self, task: Task, expected: TaskStatus, stage: str) -> Task:
"""If task is failed or has unexpected status, return failed task; else return task."""
if task.status == TaskStatus.failed:
return task
if task.status != expected:
return task.model_copy(
update={
"status": TaskStatus.failed,
"error": f"{stage}: expected status {expected.value}, got {task.status.value}",
},
deep=True,
)
return task
def _mark_failed(self, task: Task, stage: str, message: str) -> Task:
"""Return a copy of task with status=failed and error set to a consistent string."""
return task.model_copy(
update={
"status": TaskStatus.failed,
"error": f"[{stage}] {message}",
},
deep=True,
)
def run(
self,
job_description: str,
candidate_data: dict[str, Any],
rubric: Optional[dict[str, Any]] = None,
resume_file: Optional[str] = None,
) -> Task:
"""
Run the full pipeline. On any failure the task is marked failed and returned.
Errors are caught per step and as a top-level fallback; error message is always set when failed.
resume_file: optional label (e.g. "resumes/strong_candidate.txt") to print which candidate is being evaluated.
"""
print(BANNER)
if resume_file:
print(f"\nEvaluating Candidate: {resume_file}\n")
task: Task
try:
rubric_model = None
if rubric is not None:
try:
rubric_model = Rubric.model_validate(rubric)
except Exception as e: # noqa: BLE001
logger.warning("rubric validation failed: %s", e)
task = Task(
job_description=job_description,
candidate_data=candidate_data,
rubric=None,
)
log_step("Workflow Manager", "Pipeline failed", {"error": f"Invalid rubric: {e!s}"})
return self._mark_failed(task, "init", f"Invalid rubric: {e!s}")
task = Task(
job_description=job_description,
candidate_data=candidate_data,
rubric=rubric_model,
)
except Exception as e: # noqa: BLE001
logger.exception("task init failed: %s", e)
task = Task(job_description=job_description, candidate_data=candidate_data)
log_step("Workflow Manager", "Pipeline failed", {"error": str(e)})
return self._mark_failed(task, "init", str(e))
log_step("Workflow Manager", "Starting pipeline")
logger.info("stage=create task_id=%s status=%s", task.task_id, task.status.value)
try:
task = self._planner.plan(task)
except Exception as e: # noqa: BLE001
logger.exception("stage=plan task_id=%s error=%s", task.task_id, e)
log_step("Workflow Manager", "Pipeline failed", {"error": str(e)})
return self._mark_failed(task, "plan", str(e))
task = self._ensure_status(task, TaskStatus.planned, "plan")
if task.status == TaskStatus.failed:
logger.warning("stage=plan task_id=%s status=failed error=%s", task.task_id, task.error)
log_step("Workflow Manager", "Pipeline failed", {"error": task.error})
return task
logger.info("stage=plan task_id=%s status=%s", task.task_id, task.status.value)
try:
task = self._screening_agent.screen(task)
except Exception as e: # noqa: BLE001
logger.exception("stage=screening task_id=%s error=%s", task.task_id, e)
log_step("Workflow Manager", "Pipeline failed", {"error": str(e)})
return self._mark_failed(task, "screening", str(e))
try:
task = self._interview_agent.conduct_interview(task)
except Exception as e: # noqa: BLE001
logger.exception("stage=interview task_id=%s error=%s", task.task_id, e)
log_step("Workflow Manager", "Pipeline failed", {"error": str(e)})
return self._mark_failed(task, "interview", str(e))
task = self._ensure_status(task, TaskStatus.interviewing, "interview")
if task.status == TaskStatus.failed:
logger.warning("stage=interview task_id=%s status=failed error=%s", task.task_id, task.error)
log_step("Workflow Manager", "Pipeline failed", {"error": task.error})
return task
logger.info("stage=interview task_id=%s status=%s", task.task_id, task.status.value)
try:
task = self._evaluation_agent.evaluate(task)
except Exception as e: # noqa: BLE001
logger.exception("stage=evaluate task_id=%s error=%s", task.task_id, e)
log_step("Workflow Manager", "Pipeline failed", {"error": str(e)})
return self._mark_failed(task, "evaluate", str(e))
task = self._ensure_status(task, TaskStatus.evaluated, "evaluate")
if task.status == TaskStatus.failed:
logger.warning("stage=evaluate task_id=%s status=failed error=%s", task.task_id, task.error)
log_step("Workflow Manager", "Pipeline failed", {"error": task.error})
return task
logger.info("stage=evaluate task_id=%s status=%s", task.task_id, task.status.value)
try:
task = self._execution_agent.execute(task)
except Exception as e: # noqa: BLE001
logger.exception("stage=execute task_id=%s error=%s", task.task_id, e)
log_step("Workflow Manager", "Pipeline failed", {"error": str(e)})
return self._mark_failed(task, "execute", str(e))
task = self._ensure_status(task, TaskStatus.verified, "execute")
if task.status == TaskStatus.failed:
logger.warning("stage=execute task_id=%s status=failed error=%s", task.task_id, task.error)
log_step("Workflow Manager", "Pipeline failed", {"error": task.error})
return task
logger.info("stage=execute task_id=%s status=%s", task.task_id, task.status.value)
try:
task = self._verification_agent.verify(task)
except Exception as e: # noqa: BLE001
logger.exception("stage=verify task_id=%s error=%s", task.task_id, e)
log_step("Workflow Manager", "Pipeline failed", {"error": str(e)})
return self._mark_failed(task, "verify", str(e))
if task.status == TaskStatus.failed:
logger.warning("stage=verify task_id=%s status=failed error=%s", task.task_id, task.error)
log_step("Workflow Manager", "Pipeline failed", {"error": task.error})
return task
logger.info("stage=verify task_id=%s status=%s", task.task_id, task.status.value)
log_step("Workflow Manager", "Pipeline completed")
return task