Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
14 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions sdk/agentserver/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# Spec Kit
.specify/
specs/
.github/
.vscode/
31 changes: 31 additions & 0 deletions sdk/agentserver/azure-ai-agentserver-core/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,41 @@

### Features Added

- **Durable long-running agents** — New `@durable_task` decorator and supporting types for building crash-resilient, long-running agents that survive container crashes, OOM kills, and redeployments. Key capabilities:
- **Lifecycle automation** — `.run()` and `.start()` automatically start, resume, or recover tasks based on their current state in the task store.
- **Entry mode awareness** — `ctx.entry_mode` tells the function whether it was entered `"fresh"`, `"resumed"` from suspension, or `"recovered"` from a crash.
- **Suspend & resume** — `ctx.suspend(output=..., reason=...)` pauses execution for multi-turn agent patterns (e.g., waiting for user input).
- **TaskResult wrapper** — `run()` and `result()` return `TaskResult[Output]` with `.is_completed` / `.is_suspended` properties, making suspension a normal return value instead of an exception.
- **Streaming** — `ctx.stream(chunk)` emits incremental output; consumers iterate with `async for chunk in task_run`.
- **Cancellation & timeout** — Cooperative cancel via `ctx.cancel` event, configurable `timeout`, and `terminate()` for forced shutdown.
- **RetryPolicy** — Configurable retry with factory presets: `.exponential_backoff()`, `.fixed_delay()`, `.linear_backoff()`, `.no_retry()`.
- **Source auto-stamping** — The framework automatically stamps every task with provenance metadata: `type` (`agentserver.durable_task`), `name` (the decorator `name` option — the stable identity anchor), and `server_version` (the `x-platform-server` header value). Source is framework-owned and not user-overridable. A reserved tag `_durable_task_name` is also auto-stamped for LIST API filtering by function name.
- **Callable factories** — `tags`, `title`, and `description` accept `Callable[[Input, task_id], T]` for dynamic metadata computed at task creation time.
- **TaskMetadata** — Dict-like mutable progress metadata (`ctx.metadata["key"] = value`) with debounced auto-flush to the task store. Supports `[]`, `in`, `for`, `len`, `del`, plus convenience methods `.increment()` and `.append()`.
- **Handle operations** — `TaskRun.metadata` for progress snapshot reads, `TaskRun.delete()` for task cleanup, `TaskRun.refresh()` for re-fetching state from the store, `TaskRun.lease_expiry_count` for monitoring ownership churn.
- **TaskContext.description** — `ctx.description` exposes the task description string within the running function.
- **Configurable shutdown grace** — `DurableTaskManager(shutdown_grace_seconds=25.0)` controls how long the manager waits for tasks to checkpoint before force-expiring leases during shutdown.
- **Task listing** — `my_task.list(status=...)` returns all tasks for a specific durable task function, automatically scoped by function name (via tag) and source type. Supports `status` and `session_id` filters.
- **Steerable durable tasks** — New `steerable=True` parameter on `@durable_task` enables mid-flight steering where new inputs can be queued while a task is still running. Key capabilities:
- **Input queue** — `start()` on an in-progress steerable task queues the new input and returns a `TaskRun` handle immediately, instead of raising `TaskConflictError`.
- **Cancel signal** — `ctx.cancel` is automatically set when new inputs arrive, giving the function a cooperative signal to short-circuit.
- **Automatic drain** — The framework drains the queue after the function suspends or completes, re-entering with the next queued input using `entry_mode="resumed"` and `was_steered=True`.
- **Superseded results** — Previous generation's `TaskRun.result()` resolves with `status="superseded"` and `is_superseded=True`.
- **Context enrichment** — `ctx.was_steered`, `ctx.previous_input`, `ctx.pending_inputs`, and `ctx.generation` provide full steering context.
- **Queue limits** — `max_pending` (default 10) prevents unbounded queue growth; raises `SteeringQueueFull` when exceeded.
- **Crash recovery** — `drain_in_progress` flag in persisted state enables recovery from mid-drain crashes.
- **Distributed steering** — Lease renewal loop polls for pending inputs from other processes and sets `ctx.cancel` accordingly.
- **Etag-aware completion** — Steerable tasks use optimistic concurrency on completion to detect concurrent steering.

### Breaking Changes

- **`source` parameter removed** — The `source` keyword argument has been removed from `@durable_task()`, `.run()`, `.start()`, and `.options()`. Source provenance is now auto-stamped by the framework and cannot be overridden by developers. Use `tags` for custom metadata.

### Bugs Fixed

- **Local provider payload merge** — Fixed `_local_provider.py` to use strict shallow merge per Protocol Spec §11: root-level keys are now always replaced, not recursively merged. Previously nested dicts were merged with `dict.update()`, which was more forgiving than the real Task Storage API.
- **Task recovery routing** — `_find_resume_callback()` now matches by `source.name` (the auto-stamped function name) first, then falls back to title prefix match. Previously relied only on fragile title prefix heuristic.

### Other Changes

## 2.0.0b3 (2026-04-22)
Expand Down
49 changes: 49 additions & 0 deletions sdk/agentserver/azure-ai-agentserver-core/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,54 @@ export APPLICATIONINSIGHTS_CONNECTION_STRING="InstrumentationKey=..."
python my_agent.py
```

### Durable long-running agents

The `@durable_task` decorator builds crash-resilient agents that survive container restarts, OOM kills, and redeployments. Task state is persisted to a task store, enabling automatic recovery and multi-turn suspend/resume patterns.

```python
from datetime import timedelta
from azure.ai.agentserver.core.durable import durable_task, TaskContext, RetryPolicy

@durable_task(
timeout=timedelta(minutes=30),
retry=RetryPolicy.exponential_backoff(max_attempts=3),
tags={"priority": "high"},
)
async def process_document(ctx: TaskContext[dict]) -> dict:
ctx.metadata["phase"] = "processing"
result = await analyze(ctx.input["document_url"])
ctx.metadata["phase"] = "complete"
return {"summary": result}
```

**Start and await a task:**

```python
result = await process_document.run(task_id="doc-42", input={"document_url": "..."})
print(result.output) # {"summary": "..."}
```

**Multi-turn suspend/resume (e.g., conversational agents):**

```python
@durable_task()
async def chat_session(ctx: TaskContext[dict]) -> dict:
message = ctx.input["message"]
history = ctx.metadata.get("history", [])
reply = await generate_reply(message, history)
history.append({"role": "user", "content": message})
history.append({"role": "assistant", "content": reply})
ctx.metadata["history"] = history
return await ctx.suspend(output={"reply": reply})

# Each call resumes the same session:
result = await chat_session.run(task_id="session-1", input={"message": "Hello"})
print(result.output) # {"reply": "Hi! How can I help?"}
print(result.is_suspended) # True
```

See the [Developer Guide](https://github.com/Azure/azure-sdk-for-python/blob/main/sdk/agentserver/azure-ai-agentserver-core/docs/durable-task-developer-guide.md) for the full API reference.

## Troubleshooting

### Logging
Expand All @@ -130,6 +178,7 @@ To report an issue with the client library, or request additional features, plea
## Next steps

- Install [`azure-ai-agentserver-invocations`](https://pypi.org/project/azure-ai-agentserver-invocations/) to add the invocation protocol endpoints.
- Read the [Durable Task Developer Guide](https://github.com/Azure/azure-sdk-for-python/blob/main/sdk/agentserver/azure-ai-agentserver-core/docs/durable-task-developer-guide.md) for crash-resilient long-running agents.
- See the [container image spec](https://github.com/Azure/azure-sdk-for-python/tree/main/sdk/agentserver) for the full hosted agent contract.

## Contributing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
trace_stream,
)
"""

__path__ = __import__("pkgutil").extend_path(__path__, __name__)

from ._base import AgentServerHost
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,21 +160,36 @@ class MyHost(InvocationAgentServerHost, ResponsesAgentServerHost):

_DEFAULT_ACCESS_LOG_FORMAT = '%(h)s "%(r)s" %(s)s %(b)s %(D)sμs'

def __init__(
def __init__( # pylint: disable=too-many-statements
self,
*,
applicationinsights_connection_string: Optional[str] = None,
graceful_shutdown_timeout: Optional[int] = None,
log_level: Optional[str] = None,
access_log: Optional[logging.Logger] = _SENTINEL_ACCESS_LOG, # type: ignore[assignment]
access_log_format: Optional[str] = None,
configure_observability: Optional[Callable[..., None]] = _tracing.configure_observability,
configure_observability: Optional[
Callable[..., None]
] = _tracing.configure_observability,
routes: Optional[list[Route]] = None,
**kwargs: Any,
) -> None:
# Shutdown handler slot (server-level lifecycle) -------------------
self._shutdown_fn: Optional[Callable[[], Awaitable[None]]] = None

# Durable task manager (optional — enabled by default) ----
self._durable_task_manager: Optional[Any] = None
try:
from .durable._manager import ( # pylint: disable=import-outside-toplevel
DurableTaskManager,
)

self._durable_task_manager = DurableTaskManager(
_config.AgentConfig.from_env()
)
except Exception: # pylint: disable=broad-exception-caught
pass # durable tasks not available — continue without

# Server version segments for the x-platform-server header.
# Protocol packages call register_server_version() to add their
# own portion; the middleware joins them at response time.
Expand All @@ -187,7 +202,10 @@ def __init__(
self.config: _config.AgentConfig = _config.AgentConfig.from_env()

# Observability (logging + tracing) --------------------------------
_conn_str = applicationinsights_connection_string or self.config.appinsights_connection_string
_conn_str = (
applicationinsights_connection_string
or self.config.appinsights_connection_string
)
if configure_observability is not None:
try:
configure_observability(
Expand All @@ -197,13 +215,18 @@ def __init__(
except ValueError:
raise # invalid log_level etc. — user should fix their config
except Exception: # pylint: disable=broad-exception-caught
logger.warning("Failed to initialize observability; continuing without it.", exc_info=True)
logger.warning(
"Failed to initialize observability; continuing without it.",
exc_info=True,
)

# Access logging ---------------------------------------------------
self._access_log: Optional[logging.Logger] = (
logger if access_log is _SENTINEL_ACCESS_LOG else access_log
)
self._access_log_format: str = access_log_format or self._DEFAULT_ACCESS_LOG_FORMAT
self._access_log_format: str = (
access_log_format or self._DEFAULT_ACCESS_LOG_FORMAT
)

# Timeouts ---------------------------------------------------------
self._graceful_shutdown_timeout = _config.resolve_graceful_shutdown_timeout(
Expand All @@ -212,7 +235,9 @@ def __init__(

# Build lifespan context manager
@contextlib.asynccontextmanager
async def _lifespan(_app: Starlette) -> AsyncGenerator[None, None]: # noqa: RUF029
async def _lifespan(
_app: Starlette,
) -> AsyncGenerator[None, None]: # noqa: RUF029
logger.info("AgentServerHost started")

# --- Startup configuration logging ---
Expand All @@ -225,28 +250,55 @@ async def _lifespan(_app: Starlette) -> AsyncGenerator[None, None]: # noqa: RUF
cfg.agent_version or _NOT_SET,
cfg.port,
cfg.session_id or _NOT_SET,
cfg.sse_keepalive_interval if cfg.sse_keepalive_interval > 0 else "disabled",
(
cfg.sse_keepalive_interval
if cfg.sse_keepalive_interval > 0
else "disabled"
),
)
logger.info(
"Connectivity: project_endpoint=%s, otlp_endpoint=%s, appinsights_configured=%s",
_mask_uri(cfg.project_endpoint),
_mask_uri(cfg.otlp_endpoint),
bool(cfg.appinsights_connection_string),
)
protocols = ", ".join(self._server_version_segments) if self._server_version_segments else _NOT_SET
protocols = (
", ".join(self._server_version_segments)
if self._server_version_segments
else _NOT_SET
)
logger.info(
"Host options: shutdown_timeout=%ss, protocols=%s",
self._graceful_shutdown_timeout,
protocols,
)

# --- Durable task manager startup ---
if self._durable_task_manager is not None:
from .durable._manager import ( # pylint: disable=import-outside-toplevel
set_task_manager,
)

set_task_manager(self._durable_task_manager)
await self._durable_task_manager.startup()

yield

# --- SHUTDOWN: runs once when the server is stopping ---
logger.info(
"AgentServerHost shutting down (graceful timeout=%ss)",
self._graceful_shutdown_timeout,
)

# Durable task manager shutdown
if self._durable_task_manager is not None:
try:
await self._durable_task_manager.shutdown()
except Exception: # pylint: disable=broad-exception-caught
logger.warning(
"Error shutting down durable task manager", exc_info=True
)

if self._graceful_shutdown_timeout == 0:
logger.info("Graceful shutdown drain period disabled (timeout=0)")
else:
Expand All @@ -263,11 +315,22 @@ async def _lifespan(_app: Starlette) -> AsyncGenerator[None, None]: # noqa: RUF
except Exception: # pylint: disable=broad-exception-caught
logger.warning("Error in on_shutdown", exc_info=True)

# Merge routes: subclass routes (if any) + health endpoint
# Merge routes: subclass routes (if any) + health endpoint + durable tasks
all_routes: list[Any] = list(routes or [])
all_routes.append(
Route("/readiness", self._readiness_endpoint, methods=["GET"], name="readiness"),
Route(
"/readiness",
self._readiness_endpoint,
methods=["GET"],
name="readiness",
),
)
if self._durable_task_manager is not None:
from .durable._resume_route import ( # pylint: disable=import-outside-toplevel
create_resume_route,
)

all_routes.append(create_resume_route())

# Initialize Starlette with combined routes, lifespan, and middleware
super().__init__(
Expand Down Expand Up @@ -380,7 +443,9 @@ def request_span(
# Shutdown handler (server-level lifecycle)
# ------------------------------------------------------------------

def shutdown_handler(self, fn: Callable[[], Awaitable[None]]) -> Callable[[], Awaitable[None]]:
def shutdown_handler(
self, fn: Callable[[], Awaitable[None]]
) -> Callable[[], Awaitable[None]]:
"""Register a function as the shutdown handler.

:param fn: Async function called during graceful shutdown.
Expand Down Expand Up @@ -455,7 +520,9 @@ def _handle_sigterm(_signum: int, _frame: Any) -> None:
finally:
signal.signal(signal.SIGTERM, original_sigterm)

async def run_async(self, host: str = "0.0.0.0", port: Optional[int] = None) -> None:
async def run_async(
self, host: str = "0.0.0.0", port: Optional[int] = None
) -> None:
"""Start the server asynchronously (awaitable).

:param host: Network interface to bind. Defaults to ``"0.0.0.0"``.
Expand All @@ -474,7 +541,9 @@ async def run_async(self, host: str = "0.0.0.0", port: Optional[int] = None) ->
# Health endpoint
# ------------------------------------------------------------------

async def _readiness_endpoint(self, request: Request) -> Response: # pylint: disable=unused-argument
async def _readiness_endpoint(
self, request: Request
) -> Response: # pylint: disable=unused-argument
"""GET /readiness — readiness check endpoint.

:param request: The incoming Starlette request.
Expand Down Expand Up @@ -516,7 +585,9 @@ async def sse_keepalive_stream(
if pending is None:
pending = asyncio.ensure_future(ait.__anext__())
try:
chunk = await asyncio.wait_for(asyncio.shield(pending), timeout=interval)
chunk = await asyncio.wait_for(
asyncio.shield(pending), timeout=interval
)
pending = None # consumed — create new task next iteration
yield chunk
except asyncio.TimeoutError:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,8 @@ def from_env(cls) -> Self:
session_id=os.environ.get(_ENV_FOUNDRY_AGENT_SESSION_ID, ""),
port=resolve_port(None),
appinsights_connection_string=os.environ.get(
_ENV_APPLICATIONINSIGHTS_CONNECTION_STRING, ""),
_ENV_APPLICATIONINSIGHTS_CONNECTION_STRING, ""
),
otlp_endpoint=os.environ.get(_ENV_OTEL_EXPORTER_OTLP_ENDPOINT, ""),
sse_keepalive_interval=resolve_sse_keepalive_interval(None),
)
Expand Down Expand Up @@ -158,9 +159,7 @@ def _require_int(name: str, value: object) -> int:
:raises ValueError: If *value* is not an integer.
"""
if isinstance(value, bool) or not isinstance(value, int):
raise ValueError(
f"Invalid value for {name}: {value!r} (expected an integer)"
)
raise ValueError(f"Invalid value for {name}: {value!r} (expected an integer)")
return value


Expand All @@ -176,9 +175,7 @@ def _validate_port(value: int, source: str) -> int:
:raises ValueError: If the port is outside 1-65535.
"""
if not 1 <= value <= 65535:
raise ValueError(
f"Invalid value for {source}: {value} (expected 1-65535)"
)
raise ValueError(f"Invalid value for {source}: {value} (expected 1-65535)")
return value


Expand Down Expand Up @@ -239,9 +236,7 @@ def resolve_appinsights_connection_string(
"""
if connection_string is not None:
return connection_string
return os.environ.get(
_ENV_APPLICATIONINSIGHTS_CONNECTION_STRING
)
return os.environ.get(_ENV_APPLICATIONINSIGHTS_CONNECTION_STRING)


def resolve_log_level(level: Optional[str]) -> str:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,4 @@ def create_error_response(
body["type"] = error_type
if details is not None:
body["details"] = details
return JSONResponse(
{"error": body}, status_code=status_code, headers=headers
)
return JSONResponse({"error": body}, status_code=status_code, headers=headers)
Loading
Loading