Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions METRICS.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ The Conductor Python SDK includes built-in metrics collection using Prometheus t
| `task_result_size` | Gauge | `taskType` | Size of task result payload (bytes) |
| `task_execution_queue_full_total` | Counter | `taskType` | Number of times execution queue was full |
| `task_paused_total` | Counter | `taskType` | Number of polls while worker paused |
| `worker_restart_total` | Counter | `taskType` | Number of times TaskHandler restarted a worker subprocess |
| `external_payload_used_total` | Counter | `taskType`, `payloadType` | External payload storage usage count |
| `workflow_input_size` | Gauge | `workflowType`, `version` | Workflow input payload size (bytes) |
| `workflow_start_error_total` | Counter | `workflowType`, `exception` | Workflow start error count |
Expand Down
320 changes: 221 additions & 99 deletions README.md

Large diffs are not rendered by default.

56 changes: 54 additions & 2 deletions docs/WORKER.md
Original file line number Diff line number Diff line change
Expand Up @@ -259,12 +259,64 @@ workers = [
)
]

# If there are decorated workers in your application, scan_for_annotated_workers should be set
# default value of scan_for_annotated_workers is False
# TaskHandler scans for @worker_task decorated workers by default.
# Set scan_for_annotated_workers=False if you want to disable auto-discovery.
with TaskHandler(workers, configuration, scan_for_annotated_workers=True) as task_handler:
task_handler.start_processes()
```

### Resilience: auto-restart and health checks

If you run workers as a long-lived service (e.g., alongside FastAPI/Uvicorn), you can optionally enable process
supervision so the `TaskHandler` monitors worker processes and restarts them if they exit unexpectedly:

```python
with TaskHandler(
workers,
configuration,
scan_for_annotated_workers=True,
# Enabled by default. Set to False to disable supervision.
monitor_processes=True,
restart_on_failure=True,
) as task_handler:
task_handler.start_processes()
```

For a `/healthcheck` endpoint, you can use:

```python
task_handler.is_healthy()
task_handler.get_worker_process_status()
```

To disable restarts/monitoring (e.g., for local debugging), set:

```python
TaskHandler(..., monitor_processes=False, restart_on_failure=False)
```

### Mitigation for intermittent HTTP/2 connection termination

The SDK uses `httpx` for outbound calls to the Conductor/Orkes server. By default, it enables HTTP/2 for these calls.
In some environments (certain proxies/load balancers/NATs), long-lived HTTP/2 connections may be terminated, which can
surface as errors like `httpcore.RemoteProtocolError: <ConnectionTerminated ...>`.

The SDK automatically attempts to recover by recreating the underlying HTTP client and retrying the request once.
If your environment is still unstable with HTTP/2, you can force the SDK to use HTTP/1.1 instead via an environment
variable.

#### `CONDUCTOR_HTTP2_ENABLED`

- **What it does**: Controls whether the Conductor Python SDK uses HTTP/2 for outbound requests to the Conductor server.
- **Default**: `true` (HTTP/2 enabled).
- **Scope**: Affects all SDK clients (workers, `OrkesClients`, sync + async). It does *not* change your FastAPI/Uvicorn
server behavior; it only changes how the SDK talks to Conductor.
- **Values**: `false|0|no|off` disables HTTP/2. Anything else enables it.

```shell
export CONDUCTOR_HTTP2_ENABLED=false
```

If you paste the above code in a file called main.py, you can launch the workers by running:
```shell
python3 main.py
Expand Down
3 changes: 2 additions & 1 deletion examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ python examples/workers_e2e.py
|------|-------------|-----|
| **workers_e2e.py** | ⭐ Start here - sync + async workers | `python examples/workers_e2e.py` |
| **worker_example.py** | Comprehensive patterns (None returns, TaskInProgress) | `python examples/worker_example.py` |
| **fastapi_worker_service.py** | FastAPI exposing a workflow as an API (+ workers) | `uvicorn examples.fastapi_worker_service:app --port 8081 --workers 1` |
| **worker_configuration_example.py** | Hierarchical configuration (env vars) | `python examples/worker_configuration_example.py` |
| **task_context_example.py** | Task context (logs, poll_count, task_id) | `python examples/task_context_example.py` |
| **task_workers.py** | Task worker patterns with dataclasses | `python examples/task_workers.py` |
Expand Down Expand Up @@ -416,4 +417,4 @@ export conductor.worker.all.thread_count=20
---

**Repository**: https://github.com/conductor-oss/conductor-python
**License**: Apache 2.0
**License**: Apache 2.0
213 changes: 213 additions & 0 deletions examples/fastapi_worker_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
"""
FastAPI + Conductor workers in one process.

Install (example-only deps):
pip install fastapi uvicorn

Run (single web worker; TaskHandler will spawn one process per Conductor worker):
export CONDUCTOR_SERVER_URL="http://localhost:8080/api"
export CONDUCTOR_AUTH_KEY="..."
export CONDUCTOR_AUTH_SECRET="..."
uvicorn examples.fastapi_worker_service:app --host 0.0.0.0 --port 8081 --workers 1

Trigger the workflow via API (waits up to 10s for completion):
curl -s -X POST http://localhost:8081/v1/hello \\
-H 'content-type: application/json' \\
-d '{"name":"Ada","a":2,"b":3}' | jq .

Notes:
- Do NOT run uvicorn with multiple web workers unless you explicitly want multiple independent TaskHandlers polling.
- TaskHandler supervision is enabled by default (monitor + restart worker subprocesses).
"""

from __future__ import annotations

import os
from contextlib import asynccontextmanager
from typing import Optional

from fastapi import FastAPI
from fastapi.responses import JSONResponse
from pydantic import BaseModel, Field

from conductor.client.automator.task_handler import TaskHandler
from conductor.client.configuration.configuration import Configuration
from conductor.client.context.task_context import get_task_context
from conductor.client.orkes_clients import OrkesClients
from conductor.client.worker.worker_task import worker_task
from conductor.client.workflow.conductor_workflow import ConductorWorkflow
from conductor.client.workflow.executor.workflow_executor import WorkflowExecutor


# ---------------------------------------------------------------------------
# Example worker(s)
# ---------------------------------------------------------------------------

@worker_task(
task_definition_name="fastapi_normalize_name",
poll_interval_millis=100,
register_task_def=True,
overwrite_task_def=False,
)
def normalize_name(name: str) -> str:
# This shows how to access task context safely.
_ = get_task_context()
return name.strip().title()


@worker_task(
task_definition_name="fastapi_add_numbers",
poll_interval_millis=100,
register_task_def=True,
overwrite_task_def=False,
)
def add_numbers(a: int, b: int) -> int:
_ = get_task_context()
return a + b


@worker_task(
task_definition_name="fastapi_build_message",
poll_interval_millis=100,
register_task_def=True,
overwrite_task_def=False,
)
def build_message(normalized_name: str, total: int) -> dict:
ctx = get_task_context()
return {
"message": f"Hello {normalized_name}! {total=}",
"normalized_name": normalized_name,
"total": total,
"task_id": ctx.get_task_id(),
"workflow_id": ctx.get_workflow_instance_id(),
}


def _build_hello_workflow(executor: WorkflowExecutor) -> ConductorWorkflow:
workflow = ConductorWorkflow(executor=executor, name="fastapi_hello_workflow", version=1)

t1 = normalize_name(task_ref_name="normalize_name_ref", name=workflow.input("name"))
t2 = add_numbers(task_ref_name="add_numbers_ref", a=workflow.input("a"), b=workflow.input("b"))
t3 = build_message(
task_ref_name="build_message_ref",
normalized_name=t1.output("result"),
total=t2.output("result"),
)

workflow >> t1 >> t2 >> t3

workflow.output_parameters(
output_parameters={
"message": t3.output("message"),
"normalized_name": t3.output("normalized_name"),
"total": t3.output("total"),
}
)

return workflow


class HelloRequest(BaseModel):
name: str = Field(default="World", description="Name to greet")
a: int = Field(default=1, description="First number")
b: int = Field(default=2, description="Second number")


# ---------------------------------------------------------------------------
# FastAPI app + TaskHandler lifecycle
# ---------------------------------------------------------------------------

task_handler: Optional[TaskHandler] = None
workflow_executor: Optional[WorkflowExecutor] = None
api_config: Optional[Configuration] = None


@asynccontextmanager
async def lifespan(app: FastAPI):
global task_handler, workflow_executor, api_config

api_config = Configuration()
clients = OrkesClients(configuration=api_config)
workflow_executor = clients.get_workflow_executor()

# scan_for_annotated_workers=True will pick up @worker_task functions in this module.
task_handler = TaskHandler(
workers=[],
configuration=api_config,
scan_for_annotated_workers=True,
# Defaults are already True, but keeping these explicit in the example:
monitor_processes=True,
restart_on_failure=True,
)
task_handler.start_processes()

try:
yield
finally:
if task_handler is not None:
task_handler.stop_processes()
task_handler = None
workflow_executor = None
api_config = None


app = FastAPI(lifespan=lifespan)


@app.get("/healthcheck")
def healthcheck():
# 503 if worker processes aren't healthy; useful for container orchestrators.
if task_handler is None:
return JSONResponse({"ok": False, "detail": "workers_not_started"}, status_code=503)

ok = task_handler.is_healthy()
payload = {
"ok": ok,
"workers": task_handler.get_worker_process_status(),
}
return JSONResponse(payload, status_code=200 if ok else 503)


@app.post("/v1/hello")
def hello(req: HelloRequest):
"""
Expose a Conductor workflow as an API:
- Builds an inline workflow definition with 3 SIMPLE tasks
- Starts it and waits up to 10 seconds for completion
- Returns workflow output as the HTTP response
"""
if task_handler is None or workflow_executor is None or api_config is None:
return JSONResponse({"ok": False, "detail": "service_not_ready"}, status_code=503)
if not task_handler.is_healthy():
return JSONResponse(
{"ok": False, "detail": "workers_unhealthy", "workers": task_handler.get_worker_process_status()},
status_code=503,
)

workflow = _build_hello_workflow(executor=workflow_executor)
payload = req.model_dump() if hasattr(req, "model_dump") else req.dict() # pydantic v2/v1

try:
run = workflow.execute(workflow_input=payload, wait_for_seconds=10)
except Exception as e:
return JSONResponse({"ok": False, "detail": "workflow_start_failed", "error": str(e)}, status_code=502)

response = {
"ok": run.status == "COMPLETED",
"workflow_id": run.workflow_id,
"status": run.status,
"output": run.output,
"ui_url": f"{api_config.ui_host}/execution/{run.workflow_id}",
}
return JSONResponse(response, status_code=200 if run.status == "COMPLETED" else 202)


if __name__ == "__main__":
import uvicorn

uvicorn.run(
"examples.fastapi_worker_service:app",
host="0.0.0.0",
port=int(os.getenv("PORT", "8081")),
workers=1,
)
12 changes: 7 additions & 5 deletions src/conductor/client/automator/json_schema_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,11 +188,13 @@ def _type_to_json_schema(type_hint, strict_schema: bool = False) -> Optional[Dic
if len(non_none_args) == 1:
# Optional[T] case
inner_schema = _type_to_json_schema(non_none_args[0], strict_schema)
if inner_schema:
# For optional, we could use oneOf or just mark as nullable
# Using nullable for simplicity
inner_schema['nullable'] = True
return inner_schema
if inner_schema is not None:
# Draft-07 JSON Schema does not support OpenAPI's `nullable`.
# Represent Optional[T] as a union with null.
if inner_schema == {}:
# "Any" already includes null, so keep it minimal.
return inner_schema
return {"anyOf": [inner_schema, {"type": "null"}]}
# Multiple non-None types in Union - too complex
return None

Expand Down
Loading
Loading