Skip to content

Commit 8f2b9dc

Browse files
authored
Add update-task-v2 endpoint with tight execute loop and reduce log verbosity (#383)
1 parent 945e805 commit 8f2b9dc

25 files changed

Lines changed: 1301 additions & 436 deletions

AGENTS.md

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -779,3 +779,24 @@ Remember: The goal is to make Conductor easy to use in every language while main
779779
780780
---
781781
782+
## 🧪 Post-Session Testing Checklist
783+
784+
**After every coding session, run the full test suite to ensure zero failures:**
785+
786+
```bash
787+
# All suites — expect 0 failures, 0 errors
788+
python3 -m pytest tests/unit tests/backwardcompatibility tests/serdesertest tests/chaos tests/integration -v
789+
790+
# Expected results:
791+
# Unit tests: ~626 passed
792+
# Backward compatibility: ~1015 passed
793+
# Serialization: ~58 passed
794+
# Chaos: 2 skipped (require special setup)
795+
# Integration: 128 skipped (require live Conductor server)
796+
# TOTAL: 0 failures, 0 errors
797+
```
798+
799+
Integration tests skip gracefully when the Conductor server is not available (no `CONDUCTOR_SERVER_URL` / `CONDUCTOR_AUTH_KEY` / `CONDUCTOR_AUTH_SECRET` env vars). When a server is available, they run against it. **There should be NO failures in any suite.**
800+
801+
---
802+

README.md

Lines changed: 168 additions & 170 deletions
Large diffs are not rendered by default.

docs/design/WORKER_SDK_IMPLEMENTATION_GUIDE.md

Lines changed: 203 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,10 @@ Initialize → Register Task Def → Start Polling → Execute Tasks → Update
203203
- Manages lifecycle (start, stop, restart)
204204
- Provides configuration to workers
205205
- Coordinates metrics collection
206+
- Monitors and auto-restarts crashed worker processes (see Section 4.4)
207+
- Provides health check APIs for container orchestrators
208+
- Supports `import_modules` to force-import modules before scanning for decorated workers
209+
- Implements context manager protocol (`with TaskHandler(...) as th:`) for clean lifecycle
206210

207211
2. **TaskRunner (Execution Engine)**
208212
- Runs in worker process
@@ -241,11 +245,20 @@ class TaskHandler {
241245
MetricsSettings metricsSettings
242246
List<EventListener> eventListeners
243247
248+
// Supervision settings
249+
Bool monitorProcesses // default: true
250+
Bool restartOnFailure // default: true
251+
Int restartMaxAttempts // default: 0 (unlimited)
252+
Float restartBackoffSeconds // default: 5.0
253+
Float restartBackoffMaxSeconds // default: 300.0
254+
244255
// Methods
245256
discover_workers() → List<Worker>
246257
start_processes()
247258
stop_processes()
248259
join_processes()
260+
is_healthy() → Bool
261+
get_worker_process_status() → Map<String, ProcessStatus>
249262
}
250263
251264
// Worker metadata
@@ -263,6 +276,7 @@ class Worker {
263276
Bool overwriteTaskDef
264277
Bool strictSchema
265278
Bool paused
279+
Bool leaseExtendEnabled // Auto-extend task lease for long-running tasks
266280
}
267281
268282
// Execution engine (one per worker process)
@@ -340,6 +354,103 @@ FUNCTION detect_worker_type(worker_function):
340354
- Rust: Check for `async fn` keyword
341355
- JavaScript/TypeScript: Check for `async function`
342356

357+
### 4.4 Worker Process Supervision
358+
359+
**Key Principle:** Worker processes must be monitored and auto-restarted on failure for production reliability.
360+
361+
**Architecture:**
362+
- TaskHandler spawns a background monitor thread after `start_processes()`
363+
- Monitor thread periodically checks if each worker process is alive
364+
- Dead processes are restarted with exponential backoff to prevent crash loops
365+
366+
```
367+
FUNCTION start_monitor():
368+
IF not monitor_processes:
369+
RETURN
370+
371+
// Spawn background thread
372+
monitor_thread = Thread(target=monitor_loop, daemon=True)
373+
monitor_thread.start()
374+
375+
FUNCTION monitor_loop():
376+
WHILE not shutdown_requested:
377+
check_and_restart_processes()
378+
sleep(monitor_interval_seconds) // default: 5s
379+
380+
FUNCTION check_and_restart_processes():
381+
LOCK process_lock:
382+
FOR i, process IN enumerate(worker_processes):
383+
IF process.is_alive():
384+
CONTINUE
385+
386+
exitcode = process.exitcode
387+
worker_name = workers[i].task_definition_name
388+
389+
log_warning("Worker process exited (worker={worker_name}, exitcode={exitcode})")
390+
391+
IF not restart_on_failure:
392+
CONTINUE
393+
394+
restart_worker_process(i)
395+
396+
FUNCTION restart_worker_process(index: Int):
397+
// Enforce max attempts (0 = unlimited)
398+
IF restart_max_attempts > 0 AND restart_counts[index] >= restart_max_attempts:
399+
log_error("Max restart attempts reached for worker {worker_name}")
400+
RETURN
401+
402+
// Exponential backoff per-worker to prevent tight crash loops
403+
now = current_time()
404+
IF now < next_restart_at[index]:
405+
RETURN // Still in backoff period
406+
407+
backoff = min(
408+
restart_backoff_seconds * (2 ^ restart_counts[index]),
409+
restart_backoff_max_seconds
410+
)
411+
next_restart_at[index] = now + backoff
412+
413+
// Reap old process (avoid zombie accumulation)
414+
old_process.join(timeout=0)
415+
old_process.close()
416+
417+
// Spawn new process
418+
new_process = build_process_for_worker(workers[index])
419+
worker_processes[index] = new_process
420+
new_process.start()
421+
restart_counts[index] += 1
422+
423+
// Metrics
424+
increment_metric("worker_restart_total", {task_type: worker_name})
425+
426+
log_info("Restarted worker (worker={worker_name}, attempt={restart_counts[index]}, backoff={backoff}s)")
427+
```
428+
429+
**Configuration:**
430+
431+
| Property | Type | Default | Description |
432+
|----------|------|---------|-------------|
433+
| `monitor_processes` | Bool | true | Enable process supervision |
434+
| `restart_on_failure` | Bool | true | Auto-restart crashed workers |
435+
| `restart_max_attempts` | Int | 0 | Max restarts per worker (0 = unlimited) |
436+
| `restart_backoff_seconds` | Float | 5.0 | Initial backoff before restart |
437+
| `restart_backoff_max_seconds` | Float | 300.0 | Maximum backoff cap |
438+
| `monitor_interval_seconds` | Float | 5.0 | Health check interval |
439+
440+
**Health Check API:**
441+
442+
```
443+
FUNCTION is_healthy() → Bool:
444+
FOR process IN worker_processes:
445+
IF not process.is_alive():
446+
RETURN false
447+
RETURN true
448+
449+
FUNCTION get_worker_process_status() → Map<String, ProcessStatus>:
450+
// Returns per-worker status: alive, pid, exitcode, restart_count
451+
// Useful for /healthcheck endpoints in web frameworks
452+
```
453+
343454
---
344455

345456
## 5. Polling & Execution Loop
@@ -415,7 +526,7 @@ FUNCTION batch_poll(count: Int) → List<Task>:
415526
params = {
416527
"workerid": worker_id,
417528
"count": count,
418-
"timeout": 100 // ms, server-side long poll
529+
"timeout": worker.poll_timeout // ms, server-side long poll (default: 100)
419530
}
420531
421532
// Only include domain if not null/empty
@@ -624,12 +735,66 @@ FUNCTION update_task(task_result: TaskResult):
624735

625736
**Why This Matters:** Task was executed successfully, but Conductor doesn't know. External systems must handle recovery.
626737

738+
### 5.5b v2 Update Endpoint & Task Chaining (Optimization)
739+
740+
**Key Principle:** The v2 update endpoint returns the next task to process, eliminating a round-trip poll.
741+
742+
Instead of the pattern: execute → update → poll → execute → update → poll, the v2 endpoint enables: execute → update+poll → execute → update+poll.
743+
744+
```
745+
FUNCTION update_task_v2(task_result: TaskResult) → Task | null:
746+
// Same retry logic as update_task (Section 5.5)
747+
// BUT: response is a Task object (the next task to process) or null
748+
749+
FOR attempt IN [0, 1, 2, 3]:
750+
IF attempt > 0:
751+
sleep(attempt * 10 seconds)
752+
753+
TRY:
754+
next_task = http_client.update_task_v2(task_result)
755+
RETURN next_task // May be null if no pending tasks
756+
757+
CATCH Exception:
758+
// Same retry logic as v1
759+
760+
RETURN null
761+
```
762+
763+
**Execute-Update Loop:**
764+
765+
```
766+
FUNCTION execute_and_update_task(task: Task):
767+
// Tight loop: execute → update_v2 (get next) → execute → ...
768+
WHILE task is not null AND not shutdown:
769+
result = execute_task(task)
770+
771+
// TaskInProgress or async: stop chaining
772+
IF result is null OR result is TaskInProgress:
773+
RETURN
774+
775+
// Update AND get next task in one call
776+
task = update_task_v2(result)
777+
```
778+
779+
**Benefits:**
780+
- ~50% fewer HTTP round-trips under load (update + poll combined)
781+
- Lower latency between consecutive tasks
782+
- Backward compatible: falls back to normal polling when v2 returns null
783+
784+
**HTTP Endpoint:**
785+
```
786+
POST /api/tasks/update-v2
787+
Body: TaskResult (JSON)
788+
Response: Task | null (next task to process for same task type)
789+
```
790+
627791
### 5.6 Capacity Management
628792

629793
**Key Principle:** Capacity represents end-to-end task handling (execute + update)
630794

795+
**Async Workers (Explicit Semaphore):**
631796
```
632-
// Semaphore/capacity held during BOTH execute and update
797+
// Semaphore held during BOTH execute and update
633798
FUNCTION execute_and_update_task(task: Task):
634799
ACQUIRE semaphore: // Blocks if at capacity
635800
result = execute_task(task)
@@ -641,7 +806,23 @@ FUNCTION execute_and_update_task(task: Task):
641806
// Only then can new task be polled
642807
```
643808

644-
**Why:** Ensures we don't poll more tasks than we can fully handle (execute AND update).
809+
**Sync Workers (Implicit via Thread Pool):**
810+
```
811+
// Thread pool naturally provides capacity management.
812+
// Each thread runs execute_and_update_task — the thread stays
813+
// occupied during BOTH execute and update, so the pool size
814+
// (= thread_count) limits concurrency without an explicit semaphore.
815+
FUNCTION execute_and_update_task(task: Task):
816+
// Runs inside ThreadPoolExecutor(max_workers=thread_count)
817+
result = execute_task(task)
818+
819+
IF result is not TaskInProgress:
820+
update_task(result)
821+
822+
// Thread returns to pool — capacity slot freed
823+
```
824+
825+
**Why:** Ensures we don't poll more tasks than we can fully handle (execute AND update). Both approaches achieve the same goal — async uses explicit semaphore, sync uses thread pool sizing.
645826

646827
---
647828

@@ -667,6 +848,7 @@ FUNCTION execute_and_update_task(task: Task):
667848
| `overwrite_task_def` | Bool | true | Overwrite existing task definitions |
668849
| `strict_schema` | Bool | false | Enforce strict JSON schema validation |
669850
| `paused` | Bool | false | Pause worker (stop polling) |
851+
| `lease_extend_enabled` | Bool | false | Auto-extend task lease for long-running tasks (alternative to TaskInProgress) |
670852

671853
### 6.3 Environment Variable Format
672854

@@ -1144,6 +1326,8 @@ FUNCTION reset_auth_failures():
11441326
auth_failures = 0
11451327
```
11461328

1329+
**When to Reset:** Auth failures should be reset when a poll succeeds (200 response), regardless of whether tasks were returned. A successful HTTP response means authentication is working.
1330+
11471331
### 9.3 Adaptive Backoff for Empty Polls
11481332

11491333
```
@@ -1440,14 +1624,23 @@ Query Params:
14401624
Response: List<Task>
14411625
```
14421626

1443-
**Update Task:**
1627+
**Update Task (v1):**
14441628
```
14451629
POST /api/tasks
14461630
Body: TaskResult (JSON)
14471631
14481632
Response: string (task status)
14491633
```
14501634

1635+
**Update Task (v2) — Recommended:**
1636+
```
1637+
POST /api/tasks/update-v2
1638+
Body: TaskResult (JSON)
1639+
1640+
Response: Task | null (next task to process for same task type)
1641+
```
1642+
The v2 endpoint combines update + poll: it updates the current task result and returns the next pending task (if any) for the same task type. This enables the execute-update loop optimization described in Section 5.5b.
1643+
14511644
**Register Task Definition:**
14521645
```
14531646
POST /api/metadata/taskdefs
@@ -1782,7 +1975,12 @@ FUNCTION validate_and_process_order(order_id: String) → Result:
17821975

17831976
### 16.2 Long-Running Tasks (TaskInProgress)
17841977

1785-
**Pattern:** Return TaskInProgress to extend lease
1978+
**Two approaches for long-running tasks:**
1979+
1980+
1. **TaskInProgress (manual):** Worker returns `TaskInProgress` to re-queue itself with a callback delay. Best for tasks that need incremental progress tracking.
1981+
2. **Lease Extension (automatic):** Set `lease_extend_enabled=true` on the worker — the SDK automatically extends the task lease periodically. Best for tasks that run continuously without needing poll-based progress.
1982+
1983+
**Pattern 1: TaskInProgress — Return to re-queue**
17861984

17871985
```
17881986
class TaskInProgress {

pyproject.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,9 @@ line-ending = "auto"
153153
"tests/**/*.py" = ["B", "C4", "SIM"]
154154
"examples/**/*.py" = ["B", "C4", "SIM"]
155155

156+
[tool.pytest.ini_options]
157+
pythonpath = ["src"]
158+
156159
[tool.coverage.run]
157160
source = ["src/conductor"]
158161
omit = [

src/conductor/__init__.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,6 @@
1+
from __future__ import annotations
2+
3+
from pkgutil import extend_path
4+
5+
__path__ = extend_path(__path__, __name__)
16
__version__ = "1.1.10"

0 commit comments

Comments
 (0)