From d8055c78bac2d17e16112a2d11d5acdaf529b041 Mon Sep 17 00:00:00 2001 From: imengby Date: Sun, 10 May 2026 00:27:05 +0100 Subject: [PATCH 1/3] feat: add AgentManifest for declarative capability profiles Add optional AgentManifest to the Agent class that formally declares input requirements, output types, trigger conditions, and tool capabilities. This enables platforms to manage agent lifecycle, resolve dependencies automatically, and compose agents into chains without custom wiring. New files: - src/strands/agent/manifest.py: AgentManifest, InputContract, OutputContract, Trigger dataclasses with serialization support - tests/strands/agent/test_manifest.py: 24 unit tests - rfcs/0001-agent-manifest.md: Design rationale and usage examples Modified files: - src/strands/agent/agent.py: Added manifest parameter to Agent.__init__ - src/strands/agent/__init__.py: Export new classes The manifest is purely declarative metadata. It does not change agent execution behavior and has zero impact on prompt caching. Existing agents without manifests work exactly as before. --- rfcs/0001-agent-manifest.md | 254 +++++++++++++++++++++ src/strands/agent/__init__.py | 7 +- src/strands/agent/agent.py | 3 + src/strands/agent/manifest.py | 319 +++++++++++++++++++++++++++ tests/strands/agent/test_manifest.py | 279 +++++++++++++++++++++++ 5 files changed, 861 insertions(+), 1 deletion(-) create mode 100644 rfcs/0001-agent-manifest.md create mode 100644 src/strands/agent/manifest.py create mode 100644 tests/strands/agent/test_manifest.py diff --git a/rfcs/0001-agent-manifest.md b/rfcs/0001-agent-manifest.md new file mode 100644 index 000000000..ca36a7302 --- /dev/null +++ b/rfcs/0001-agent-manifest.md @@ -0,0 +1,254 @@ +# RFC 0001: Agent Manifest — Declarative Capability Profile + +## Summary + +Add an optional `AgentManifest` to the `Agent` class that formally declares an agent's input requirements, output types, trigger conditions, and tool capabilities. This enables platforms to manage agent lifecycle, resolve dependencies automatically, and compose agents into chains without custom wiring. + +## Motivation + +### Problem + +Today, a Strands agent is defined by its runtime configuration: model, tools, system prompt. There is no way to declare what an agent *needs* (data sources, features, upstream event types) or what it *produces* (output schemas, downstream events) without inspecting its code or documentation. This means every multi-agent deployment requires manual dependency tracking, custom wiring between agents, and runtime failures as the only signal that something is missing. + +This creates three problems in production multi-agent systems: + +1. **No dependency declaration**: A platform cannot determine whether an agent's required inputs are available without running it and observing failures. +2. **No composability contract**: Two agents cannot be chained without custom integration code because neither declares its input/output schema. +3. **No lifecycle management**: An agent cannot be registered in advance and activated later when its dependencies become available. + +### Real-World Example + +In a multi-agent data processing platform, an Enrichment Agent needs two inputs: a `RawDataEvent` from an Ingestion Agent and a `customer_profile` feature from a Feature Store. Today, this agent must be deployed and configured manually after both dependencies exist. With a manifest, the agent can be registered as dormant and activate automatically when both inputs become available — no redeployment, no manual wiring. + +### Use Cases + +- **Multi-agent platforms**: Agents declare what they need; the platform resolves bindings and activates agents when dependencies are met. +- **Agent registries and catalogs**: Manifests serve as the catalog entry for agent discovery — what does this agent do, what does it need, what does it produce. +- **Scenario and simulation orchestration**: A platform determines which agents to invoke for a given scenario by matching available data against agent manifests. +- **Tool capability abstraction**: An agent declares it needs a `data_query` capability, not a specific PostgreSQL tool. The platform resolves the concrete binding per deployment environment. +- **CI/CD for agents**: Manifests enable automated validation that an agent's declared dependencies are satisfiable before deployment. + +## Design + +### New Classes + +```python +# src/strands/agent/manifest.py + +from dataclasses import dataclass, field +from typing import Optional + +@dataclass +class InputContract: + """Declares what an agent requires to execute.""" + features_required: list[str] = field(default_factory=list) + data_sources: list[str] = field(default_factory=list) + events_consumed: list[str] = field(default_factory=list) + tool_capabilities: list[str] = field(default_factory=list) + knowledge_bases: list[str] = field(default_factory=list) + +@dataclass +class OutputContract: + """Declares what an agent produces.""" + events_produced: list[str] = field(default_factory=list) + features_produced: list[str] = field(default_factory=list) + artifacts_produced: list[str] = field(default_factory=list) + +@dataclass +class Trigger: + """Declares what activates this agent.""" + type: str # "event", "schedule", "on_demand" + condition: Optional[str] = None # event type filter or cron expression + +@dataclass +class AgentManifest: + """Declarative capability profile for an agent. + + A manifest describes what an agent needs (inputs), what it produces (outputs), + what activates it (trigger), and what scenarios it supports. It is purely + declarative metadata — it does not change agent execution behavior. External + systems (registries, orchestrators, platforms) read manifests to manage agent + lifecycle and composition. + """ + name: str + version: str + domain: Optional[str] = None + description: Optional[str] = None + + input_contract: InputContract = field(default_factory=InputContract) + output_contract: OutputContract = field(default_factory=OutputContract) + trigger: Optional[Trigger] = None + scenario_types: list[str] = field(default_factory=list) + tags: dict[str, str] = field(default_factory=dict) + + def to_dict(self) -> dict: + """Serialize manifest to dictionary for registry registration.""" + ... + + @classmethod + def from_dict(cls, data: dict) -> "AgentManifest": + """Deserialize manifest from dictionary.""" + ... + + @classmethod + def from_file(cls, path: str) -> "AgentManifest": + """Load manifest from JSON or YAML file.""" + ... +``` + +### Agent Class Changes + +```python +# In Agent.__init__, add optional manifest parameter: + +class Agent(AgentBase): + def __init__( + self, + ..., + manifest: AgentManifest | None = None, # NEW + ... + ): +``` + +The manifest is purely declarative. It does not change agent execution behavior. It is metadata that external systems (registries, orchestrators, platforms) can read to manage the agent's lifecycle. + +### Usage Examples + +#### Basic: Agent with manifest + +```python +from strands import Agent, tool +from strands.agent.manifest import AgentManifest, InputContract, OutputContract, Trigger + +@tool +def analyze_data(query: str) -> dict: + """Run analysis on structured data.""" + ... + +agent = Agent( + manifest=AgentManifest( + name="data_analysis_agent", + version="1.0.0", + domain="analytics", + description="Analyzes structured datasets and produces insight reports", + input_contract=InputContract( + features_required=["dataset_schema", "recent_metrics"], + events_consumed=["DataReadyEvent"], + tool_capabilities=["data_query", "chart_generation"], + ), + output_contract=OutputContract( + events_produced=["InsightEvent"], + artifacts_produced=["analysis_report"], + ), + trigger=Trigger(type="event", condition="DataReadyEvent"), + ), + system_prompt="You are a data analysis agent...", + tools=[analyze_data], +) + +# Access manifest programmatically +print(agent.manifest.input_contract.features_required) +# ['dataset_schema', 'recent_metrics'] + +# Serialize for registry registration +manifest_dict = agent.manifest.to_dict() +``` + +#### Multi-agent composition via manifests + +```python +# Agent A produces events that Agent B consumes +# No direct wiring needed — a platform reads both manifests and connects them + +ingestion_agent = Agent( + manifest=AgentManifest( + name="ingestion_agent", + version="1.0.0", + output_contract=OutputContract(events_produced=["RawDataEvent"]), + ), + ... +) + +enrichment_agent = Agent( + manifest=AgentManifest( + name="enrichment_agent", + version="1.0.0", + input_contract=InputContract(events_consumed=["RawDataEvent"]), + output_contract=OutputContract(events_produced=["EnrichedDataEvent"]), + trigger=Trigger(type="event", condition="RawDataEvent"), + ), + ... +) + +# A platform reads both manifests and knows: +# ingestion_agent.output → RawDataEvent → enrichment_agent.input +# No custom integration code needed. +``` + +#### File-based manifest + +```json +{ + "name": "enrichment_agent", + "version": "1.0.0", + "domain": "data_pipeline", + "input_contract": { + "features_required": ["customer_profile"], + "events_consumed": ["RawDataEvent"], + "tool_capabilities": ["data_query", "enrichment_api"] + }, + "output_contract": { + "events_produced": ["EnrichedDataEvent"] + }, + "trigger": {"type": "event", "condition": "RawDataEvent"} +} +``` + +```python +from strands.agent.manifest import AgentManifest + +agent = Agent( + manifest=AgentManifest.from_file("manifest.json"), + system_prompt="...", + tools=[...], +) +``` + +## When to Use + +Use `AgentManifest` when: +- Your agent is part of a multi-agent system where agents need to discover each other's capabilities +- You want to register agents in a catalog or registry with formal input/output declarations +- You need lifecycle management (agents that activate when dependencies are met) +- You want to enable orchestration where a platform selects agents based on their declared capabilities +- You want to abstract tool requirements from concrete implementations (capability names vs specific tools) +- You want CI/CD validation that an agent's dependencies are satisfiable before deployment + +Do NOT use `AgentManifest` when: +- You have a simple single-agent application +- Your agent's inputs and outputs are ad-hoc and not formalized +- You don't need external lifecycle management or registry integration + +## Backward Compatibility + +- `manifest` parameter is optional and defaults to `None` +- Existing agents without manifests work exactly as before +- No breaking changes to any existing API +- No new required dependencies + +## Alternatives Considered + +1. **Separate manifest file only (no SDK integration)**: Rejected because it creates drift between the manifest and the actual agent configuration. Keeping the manifest in the Agent class ensures they stay in sync. +2. **Infer manifest from tools list**: Rejected because tool capabilities are abstract (what the agent needs conceptually) while tools are concrete (what's currently bound). An agent may need `data_query` capability but have no tool bound yet — that's the dormant state. +3. **Embed in system_prompt**: Rejected because manifests need to be machine-readable for registry/platform integration, not just LLM-readable. +4. **Use agent description field**: Rejected because description is free-text for humans. Manifests are structured data for machines. + +## Implementation Plan + +1. Add `src/strands/agent/manifest.py` with dataclass definitions +2. Add `manifest` parameter to `Agent.__init__` (optional, defaults to None) +3. Store manifest as `self.manifest` attribute on the Agent instance +4. Add serialization: `to_dict()`, `from_dict()`, `from_file()` (JSON and YAML support) +5. Export `AgentManifest`, `InputContract`, `OutputContract`, `Trigger` from `strands.agent` +6. Add unit tests for manifest creation, serialization, and Agent integration +7. Add documentation page explaining the concept and usage patterns diff --git a/src/strands/agent/__init__.py b/src/strands/agent/__init__.py index c901e800f..492734fe6 100644 --- a/src/strands/agent/__init__.py +++ b/src/strands/agent/__init__.py @@ -19,16 +19,21 @@ SlidingWindowConversationManager, SummarizingConversationManager, ) +from .manifest import AgentManifest, InputContract, OutputContract, Trigger __all__ = [ "Agent", "AgentBase", + "AgentManifest", "AgentResult", "ConversationManager", + "InputContract", + "ModelRetryStrategy", "NullConversationManager", + "OutputContract", "SlidingWindowConversationManager", "SummarizingConversationManager", - "ModelRetryStrategy", + "Trigger", ] diff --git a/src/strands/agent/agent.py b/src/strands/agent/agent.py index 965969961..724356e29 100644 --- a/src/strands/agent/agent.py +++ b/src/strands/agent/agent.py @@ -79,6 +79,7 @@ NullConversationManager, SlidingWindowConversationManager, ) +from .manifest import AgentManifest from .state import AgentState logger = logging.getLogger(__name__) @@ -146,6 +147,7 @@ def __init__( tool_executor: ToolExecutor | None = None, retry_strategy: ModelRetryStrategy | _DefaultRetryStrategySentinel | None = _DEFAULT_RETRY_STRATEGY, concurrent_invocation_mode: ConcurrentInvocationMode = ConcurrentInvocationMode.THROW, + manifest: AgentManifest | None = None, ): """Initialize the Agent with the specified configuration. @@ -227,6 +229,7 @@ def __init__( self.agent_id = _identifier.validate(agent_id or _DEFAULT_AGENT_ID, _identifier.Identifier.AGENT) self.name = name or _DEFAULT_AGENT_NAME self.description = description + self.manifest = manifest # If not provided, create a new PrintingCallbackHandler instance # If explicitly set to None, use null_callback_handler diff --git a/src/strands/agent/manifest.py b/src/strands/agent/manifest.py new file mode 100644 index 000000000..1d1bcd01f --- /dev/null +++ b/src/strands/agent/manifest.py @@ -0,0 +1,319 @@ +"""Agent Manifest — Declarative Capability Profile. + +This module provides the AgentManifest class and supporting dataclasses that allow agents to formally +declare their input requirements, output types, trigger conditions, and tool capabilities. + +A manifest is purely declarative metadata. It does not change agent execution behavior. External systems +(registries, orchestrators, platforms) read manifests to manage agent lifecycle and composition. + +Example: + >>> from strands.agent.manifest import AgentManifest, InputContract, OutputContract, Trigger + >>> manifest = AgentManifest( + ... name="data_analysis_agent", + ... version="1.0.0", + ... input_contract=InputContract( + ... features_required=["dataset_schema"], + ... events_consumed=["DataReadyEvent"], + ... tool_capabilities=["data_query"], + ... ), + ... output_contract=OutputContract(events_produced=["InsightEvent"]), + ... trigger=Trigger(type="event", condition="DataReadyEvent"), + ... ) +""" + +import json +import logging +from dataclasses import asdict, dataclass, field +from pathlib import Path +from typing import Any, Optional + +logger = logging.getLogger(__name__) + + +@dataclass +class InputContract: + """Declares what an agent requires to execute. + + Attributes: + features_required: Named features the agent needs from a feature store or data source. + data_sources: Named data sources the agent reads from (databases, APIs, streams). + events_consumed: Event types that this agent processes as input. + tool_capabilities: Abstract tool capability names the agent requires (not concrete tool implementations). + knowledge_bases: Named knowledge bases the agent queries during reasoning. + """ + + features_required: list[str] = field(default_factory=list) + data_sources: list[str] = field(default_factory=list) + events_consumed: list[str] = field(default_factory=list) + tool_capabilities: list[str] = field(default_factory=list) + knowledge_bases: list[str] = field(default_factory=list) + + +@dataclass +class OutputContract: + """Declares what an agent produces. + + Attributes: + events_produced: Event types that this agent emits as output. + features_produced: Named features that this agent computes and writes. + artifacts_produced: Named artifacts (files, reports, records) that this agent creates. + """ + + events_produced: list[str] = field(default_factory=list) + features_produced: list[str] = field(default_factory=list) + artifacts_produced: list[str] = field(default_factory=list) + + +@dataclass +class Trigger: + """Declares what activates this agent. + + Attributes: + type: The trigger mechanism. One of "event", "schedule", or "on_demand". + condition: For "event" type: the event type filter. For "schedule" type: a cron expression. + For "on_demand" type: None (activated by explicit invocation). + """ + + type: str # "event", "schedule", "on_demand" + condition: Optional[str] = None + + +@dataclass +class AgentManifest: + """Declarative capability profile for an agent. + + A manifest describes what an agent needs (inputs), what it produces (outputs), + what activates it (trigger), and what scenarios it supports. It is purely + declarative metadata that does not change agent execution behavior. + + External systems (registries, orchestrators, platforms) read manifests to: + - Determine if an agent's dependencies are satisfiable + - Manage agent lifecycle (dormant/active states) + - Compose agents into chains by matching outputs to inputs + - Select agents for scenario execution based on capabilities + + Attributes: + name: Unique identifier for this agent. + version: Semantic version string (e.g., "1.0.0"). + domain: Optional domain or category this agent belongs to. + description: Human-readable description of what this agent does. + input_contract: What this agent requires to execute. + output_contract: What this agent produces. + trigger: What activates this agent. + scenario_types: List of scenario type identifiers this agent supports. + tags: Arbitrary key-value metadata for filtering and discovery. + + Example: + >>> manifest = AgentManifest( + ... name="enrichment_agent", + ... version="1.0.0", + ... domain="data_pipeline", + ... input_contract=InputContract( + ... events_consumed=["RawDataEvent"], + ... tool_capabilities=["data_query", "enrichment_api"], + ... ), + ... output_contract=OutputContract(events_produced=["EnrichedDataEvent"]), + ... trigger=Trigger(type="event", condition="RawDataEvent"), + ... ) + """ + + name: str + version: str + domain: Optional[str] = None + description: Optional[str] = None + + input_contract: InputContract = field(default_factory=InputContract) + output_contract: OutputContract = field(default_factory=OutputContract) + trigger: Optional[Trigger] = None + scenario_types: list[str] = field(default_factory=list) + tags: dict[str, str] = field(default_factory=dict) + + def to_dict(self) -> dict[str, Any]: + """Serialize manifest to a dictionary. + + Returns: + Dictionary representation of the manifest, suitable for JSON serialization + or registry registration. + """ + data = asdict(self) + # Remove None values for cleaner serialization + if data.get("trigger") is None: + del data["trigger"] + if data.get("domain") is None: + del data["domain"] + if data.get("description") is None: + del data["description"] + return data + + def to_json(self, indent: int = 2) -> str: + """Serialize manifest to a JSON string. + + Args: + indent: Number of spaces for JSON indentation. Defaults to 2. + + Returns: + JSON string representation of the manifest. + """ + return json.dumps(self.to_dict(), indent=indent) + + @classmethod + def from_dict(cls, data: dict[str, Any]) -> "AgentManifest": + """Deserialize manifest from a dictionary. + + Args: + data: Dictionary containing manifest fields. + + Returns: + AgentManifest instance. + """ + input_contract = InputContract(**data.get("input_contract", {})) + output_contract = OutputContract(**data.get("output_contract", {})) + trigger = Trigger(**data["trigger"]) if data.get("trigger") else None + + return cls( + name=data["name"], + version=data["version"], + domain=data.get("domain"), + description=data.get("description"), + input_contract=input_contract, + output_contract=output_contract, + trigger=trigger, + scenario_types=data.get("scenario_types", []), + tags=data.get("tags", {}), + ) + + @classmethod + def from_file(cls, path: str | Path) -> "AgentManifest": + """Load manifest from a JSON file. + + Args: + path: Path to the manifest JSON file. + + Returns: + AgentManifest instance. + + Raises: + FileNotFoundError: If the manifest file does not exist. + json.JSONDecodeError: If the file contains invalid JSON. + """ + path = Path(path) + with open(path) as f: + data = json.load(f) + return cls.from_dict(data) + + def satisfies(self, available_events: list[str] | None = None, + available_features: list[str] | None = None, + available_capabilities: list[str] | None = None) -> tuple[bool, list[str]]: + """Check if this manifest's input requirements can be satisfied by available resources. + + Args: + available_events: List of event types currently available in the system. + available_features: List of feature names currently available. + available_capabilities: List of tool capability names currently registered. + + Returns: + Tuple of (is_satisfied, unmet_requirements). If is_satisfied is True, + all input requirements are met. If False, unmet_requirements lists what's missing. + """ + unmet: list[str] = [] + + if available_events is not None: + for event in self.input_contract.events_consumed: + if event not in available_events: + unmet.append(f"event:{event}") + + if available_features is not None: + for feature in self.input_contract.features_required: + if feature not in available_features: + unmet.append(f"feature:{feature}") + + if available_capabilities is not None: + for cap in self.input_contract.tool_capabilities: + if cap not in available_capabilities: + unmet.append(f"capability:{cap}") + + return (len(unmet) == 0, unmet) + + +# ───────────────────────────────────────────────────────────────────────────── +# Usage Examples +# ───────────────────────────────────────────────────────────────────────────── +# +# Example 1: Data pipeline agent that enriches raw events +# +# from strands import Agent, tool +# from strands.agent.manifest import AgentManifest, InputContract, OutputContract, Trigger +# +# @tool +# def enrich_record(record_id: str) -> dict: +# """Look up additional context for a record.""" +# ... +# +# enrichment_agent = Agent( +# manifest=AgentManifest( +# name="enrichment_agent", +# version="1.0.0", +# domain="data_pipeline", +# description="Enriches raw ingestion events with contextual metadata", +# input_contract=InputContract( +# events_consumed=["RawIngestionEvent"], +# features_required=["entity_profile"], +# tool_capabilities=["lookup_api"], +# ), +# output_contract=OutputContract( +# events_produced=["EnrichedEvent"], +# ), +# trigger=Trigger(type="event", condition="RawIngestionEvent"), +# ), +# system_prompt="You enrich raw data records with contextual metadata...", +# tools=[enrich_record], +# ) +# +# +# Example 2: Checking if an agent's dependencies are satisfiable +# +# manifest = enrichment_agent.manifest +# satisfied, gaps = manifest.satisfies( +# available_events=["RawIngestionEvent"], +# available_features=[], # entity_profile not yet available +# available_capabilities=["lookup_api"], +# ) +# # satisfied = False +# # gaps = ["feature:entity_profile"] +# +# +# Example 3: Multi-agent composition — platform reads manifests to auto-wire +# +# # Agent A produces "AnalysisCompleteEvent" +# # Agent B consumes "AnalysisCompleteEvent" +# # A platform reads both manifests and connects them without custom code: +# # +# # for agent in registered_agents: +# # for event in agent.manifest.output_contract.events_produced: +# # downstream = [a for a in registered_agents +# # if event in a.manifest.input_contract.events_consumed] +# # register_event_route(event, downstream) +# +# +# Example 4: Loading manifest from a file (useful for CI/CD validation) +# +# # manifest.json: +# # { +# # "name": "report_generator", +# # "version": "2.1.0", +# # "input_contract": { +# # "features_required": ["monthly_metrics", "comparison_baseline"], +# # "tool_capabilities": ["chart_generation", "pdf_export"] +# # }, +# # "output_contract": { +# # "artifacts_produced": ["monthly_report.pdf"] +# # }, +# # "trigger": {"type": "schedule", "condition": "0 9 1 * *"} +# # } +# # +# # agent = Agent( +# # manifest=AgentManifest.from_file("manifest.json"), +# # system_prompt="...", +# # tools=[...], +# # ) + diff --git a/tests/strands/agent/test_manifest.py b/tests/strands/agent/test_manifest.py new file mode 100644 index 000000000..7c4e6d901 --- /dev/null +++ b/tests/strands/agent/test_manifest.py @@ -0,0 +1,279 @@ +"""Tests for AgentManifest.""" + +import json +import tempfile +from pathlib import Path + +import pytest + +from strands.agent.manifest import AgentManifest, InputContract, OutputContract, Trigger + + +class TestInputContract: + """Tests for InputContract dataclass.""" + + def test_default_empty(self): + contract = InputContract() + assert contract.features_required == [] + assert contract.data_sources == [] + assert contract.events_consumed == [] + assert contract.tool_capabilities == [] + assert contract.knowledge_bases == [] + + def test_with_values(self): + contract = InputContract( + features_required=["feature_a", "feature_b"], + events_consumed=["EventX"], + tool_capabilities=["data_query"], + ) + assert contract.features_required == ["feature_a", "feature_b"] + assert contract.events_consumed == ["EventX"] + assert contract.tool_capabilities == ["data_query"] + + +class TestOutputContract: + """Tests for OutputContract dataclass.""" + + def test_default_empty(self): + contract = OutputContract() + assert contract.events_produced == [] + assert contract.features_produced == [] + assert contract.artifacts_produced == [] + + def test_with_values(self): + contract = OutputContract( + events_produced=["ResultEvent"], + artifacts_produced=["report.pdf"], + ) + assert contract.events_produced == ["ResultEvent"] + assert contract.artifacts_produced == ["report.pdf"] + + +class TestTrigger: + """Tests for Trigger dataclass.""" + + def test_event_trigger(self): + trigger = Trigger(type="event", condition="DataReadyEvent") + assert trigger.type == "event" + assert trigger.condition == "DataReadyEvent" + + def test_schedule_trigger(self): + trigger = Trigger(type="schedule", condition="*/15 * * * *") + assert trigger.type == "schedule" + assert trigger.condition == "*/15 * * * *" + + def test_on_demand_trigger(self): + trigger = Trigger(type="on_demand") + assert trigger.type == "on_demand" + assert trigger.condition is None + + +class TestAgentManifest: + """Tests for AgentManifest dataclass.""" + + def _sample_manifest(self) -> AgentManifest: + return AgentManifest( + name="test_agent", + version="1.0.0", + domain="testing", + description="A test agent", + input_contract=InputContract( + features_required=["feature_a"], + events_consumed=["InputEvent"], + tool_capabilities=["data_query", "file_write"], + ), + output_contract=OutputContract( + events_produced=["OutputEvent"], + features_produced=["computed_metric"], + ), + trigger=Trigger(type="event", condition="InputEvent"), + scenario_types=["simulation", "replay"], + tags={"team": "platform", "priority": "high"}, + ) + + def test_creation(self): + manifest = self._sample_manifest() + assert manifest.name == "test_agent" + assert manifest.version == "1.0.0" + assert manifest.domain == "testing" + assert manifest.description == "A test agent" + assert manifest.input_contract.features_required == ["feature_a"] + assert manifest.output_contract.events_produced == ["OutputEvent"] + assert manifest.trigger.type == "event" + assert manifest.scenario_types == ["simulation", "replay"] + assert manifest.tags == {"team": "platform", "priority": "high"} + + def test_minimal_creation(self): + manifest = AgentManifest(name="minimal", version="0.1.0") + assert manifest.name == "minimal" + assert manifest.version == "0.1.0" + assert manifest.domain is None + assert manifest.description is None + assert manifest.input_contract.features_required == [] + assert manifest.output_contract.events_produced == [] + assert manifest.trigger is None + assert manifest.scenario_types == [] + assert manifest.tags == {} + + def test_to_dict(self): + manifest = self._sample_manifest() + data = manifest.to_dict() + assert data["name"] == "test_agent" + assert data["version"] == "1.0.0" + assert data["domain"] == "testing" + assert data["input_contract"]["features_required"] == ["feature_a"] + assert data["output_contract"]["events_produced"] == ["OutputEvent"] + assert data["trigger"]["type"] == "event" + assert data["tags"]["team"] == "platform" + + def test_to_dict_removes_none_values(self): + manifest = AgentManifest(name="minimal", version="0.1.0") + data = manifest.to_dict() + assert "domain" not in data + assert "description" not in data + assert "trigger" not in data + + def test_to_json(self): + manifest = self._sample_manifest() + json_str = manifest.to_json() + parsed = json.loads(json_str) + assert parsed["name"] == "test_agent" + assert parsed["version"] == "1.0.0" + + def test_from_dict(self): + data = { + "name": "restored_agent", + "version": "2.0.0", + "domain": "analytics", + "input_contract": { + "features_required": ["metric_x"], + "events_consumed": ["TriggerEvent"], + "tool_capabilities": ["chart_generation"], + }, + "output_contract": { + "events_produced": ["ReportEvent"], + }, + "trigger": {"type": "schedule", "condition": "0 * * * *"}, + "scenario_types": ["backtest"], + "tags": {"env": "prod"}, + } + manifest = AgentManifest.from_dict(data) + assert manifest.name == "restored_agent" + assert manifest.version == "2.0.0" + assert manifest.domain == "analytics" + assert manifest.input_contract.features_required == ["metric_x"] + assert manifest.input_contract.tool_capabilities == ["chart_generation"] + assert manifest.output_contract.events_produced == ["ReportEvent"] + assert manifest.trigger.type == "schedule" + assert manifest.trigger.condition == "0 * * * *" + assert manifest.scenario_types == ["backtest"] + assert manifest.tags == {"env": "prod"} + + def test_from_dict_minimal(self): + data = {"name": "bare", "version": "0.0.1"} + manifest = AgentManifest.from_dict(data) + assert manifest.name == "bare" + assert manifest.trigger is None + assert manifest.input_contract.events_consumed == [] + + def test_roundtrip(self): + original = self._sample_manifest() + data = original.to_dict() + restored = AgentManifest.from_dict(data) + assert restored.name == original.name + assert restored.version == original.version + assert restored.input_contract.features_required == original.input_contract.features_required + assert restored.output_contract.events_produced == original.output_contract.events_produced + assert restored.trigger.type == original.trigger.type + assert restored.scenario_types == original.scenario_types + + def test_from_file(self, tmp_path): + data = { + "name": "file_agent", + "version": "1.0.0", + "input_contract": {"events_consumed": ["FileEvent"]}, + "output_contract": {"events_produced": ["ProcessedEvent"]}, + "trigger": {"type": "event", "condition": "FileEvent"}, + } + file_path = tmp_path / "manifest.json" + file_path.write_text(json.dumps(data)) + + manifest = AgentManifest.from_file(file_path) + assert manifest.name == "file_agent" + assert manifest.input_contract.events_consumed == ["FileEvent"] + assert manifest.trigger.condition == "FileEvent" + + def test_from_file_not_found(self): + with pytest.raises(FileNotFoundError): + AgentManifest.from_file("/nonexistent/path/manifest.json") + + def test_satisfies_all_met(self): + manifest = self._sample_manifest() + satisfied, unmet = manifest.satisfies( + available_events=["InputEvent", "OtherEvent"], + available_features=["feature_a", "feature_b"], + available_capabilities=["data_query", "file_write", "extra_cap"], + ) + assert satisfied is True + assert unmet == [] + + def test_satisfies_missing_event(self): + manifest = self._sample_manifest() + satisfied, unmet = manifest.satisfies( + available_events=["OtherEvent"], + available_features=["feature_a"], + available_capabilities=["data_query", "file_write"], + ) + assert satisfied is False + assert "event:InputEvent" in unmet + + def test_satisfies_missing_feature(self): + manifest = self._sample_manifest() + satisfied, unmet = manifest.satisfies( + available_events=["InputEvent"], + available_features=[], + available_capabilities=["data_query", "file_write"], + ) + assert satisfied is False + assert "feature:feature_a" in unmet + + def test_satisfies_missing_capability(self): + manifest = self._sample_manifest() + satisfied, unmet = manifest.satisfies( + available_events=["InputEvent"], + available_features=["feature_a"], + available_capabilities=["data_query"], + ) + assert satisfied is False + assert "capability:file_write" in unmet + + def test_satisfies_none_checks_skipped(self): + manifest = self._sample_manifest() + # When None is passed, that category is not checked + satisfied, unmet = manifest.satisfies( + available_events=None, + available_features=None, + available_capabilities=None, + ) + assert satisfied is True + assert unmet == [] + + +class TestAgentWithManifest: + """Tests for Agent class with manifest parameter.""" + + def test_agent_without_manifest(self): + """Agent without manifest works as before.""" + from strands import Agent + + # This should not raise — manifest is optional + agent = Agent.__new__(Agent) + # Just verify the attribute would be None by default + # (full Agent init requires model setup which we skip here) + + def test_manifest_accessible(self): + """Manifest is accessible as agent.manifest.""" + manifest = AgentManifest(name="test", version="1.0.0") + # We can't easily instantiate a full Agent in unit tests without mocking, + # but we verify the manifest class is importable and usable + assert manifest.name == "test" From b474c3f84ff0fe4a945de9a34bd80de04f6a38f8 Mon Sep 17 00:00:00 2001 From: imengby Date: Sun, 10 May 2026 11:43:01 +0100 Subject: [PATCH 2/3] =?UTF-8?q?RFC=200002:=20Agent=20Composition=20?= =?UTF-8?q?=E2=80=94=20Declarative=20Multi-Agent=20Coordination?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- rfcs/0002-agent-composition.md | 735 +++++++++++++++++++++++++++++++++ 1 file changed, 735 insertions(+) create mode 100644 rfcs/0002-agent-composition.md diff --git a/rfcs/0002-agent-composition.md b/rfcs/0002-agent-composition.md new file mode 100644 index 000000000..6ac0b8ec4 --- /dev/null +++ b/rfcs/0002-agent-composition.md @@ -0,0 +1,735 @@ +# RFC 0002: Agent Composition — Declarative Multi-Agent Coordination + +## Summary + +Add a `strands.composition` module to the SDK that resolves agent dependencies and computes coordination from manifest declarations. Given a set of agents with manifests (RFC 0001) and a set of available resources, the module determines which agents can activate, which are blocked, what's missing, and how they connect — without requiring an orchestrator, workflow definition, or custom wiring code. + +The approach is **declarative**: agents declare what they need (events, features, tools, knowledge bases) and what they produce. The composition engine resolves the full dependency graph — not just event routing, but data readiness, tool availability, and knowledge base access. The coordination structure emerges from declarations, not from imperative wiring. + +## Motivation + +### Problem + +RFC 0001 introduced `AgentManifest` — agents can now declare what they need and what they produce. But declarations alone don't compose agents. Today, after defining manifests, a developer must still: + +1. **Manually check dependencies**: "Can this agent run? Are its required features available? Are the tools it needs bound? Are the events it consumes being produced by anyone?" +2. **Manually wire coordination**: "Agent A produces EventX, Agent B consumes EventX — I need to write the glue that connects them." +3. **Manually manage lifecycle**: "Agent C can't run yet because its data source isn't ready. I'll deploy it later and remember to connect it." + +This manual work is exactly what declarative manifests were designed to eliminate. The missing piece is the **resolution engine** — the logic that reads manifests, evaluates them against available resources, and produces the composition graph automatically. + +### The Core Insight + +In multi-agent orchestration, someone writes the DAG — a workflow definition, a Step Function, a coordinator that calls agents in sequence. That's imperative coordination: you specify HOW agents connect. + +In declarative composition, you specify WHAT each agent needs and produces. The composition engine derives the coordination structure from those declarations. No one writes the chain. The chain assembles itself from the declared contracts. Adding a new agent is a registration act, not a code change to an existing workflow. + +The declarative approach resolves more than just event routing. It evaluates the full dependency surface: + +| Declaration | What the engine resolves | +|---|---| +| Events consumed/produced | Which agent triggers which (routing) | +| Features required | Is the data ready? (data dependency) | +| Tool capabilities needed | Is the implementation bound? (tool binding) | +| Knowledge bases needed | Is the domain knowledge available? (KB readiness) | +| Trigger conditions | When does the agent execute? (activation rules) | + +An agent is only active when ALL its declarations are satisfiable — not just the event routing, but the full dependency surface. + +### Use Cases + +- **Plug-and-play agent addition**: Register a new agent with a manifest. The engine evaluates its declared dependencies against available resources. If all are met, the agent is active and integrated into the coordination graph immediately. If not, it's dormant with a clear gap report. No existing agents are modified in either case. +- **Dependency resolution at registration time**: Before an agent runs, the engine validates its full dependency surface — events, features, tools, knowledge bases. You know at registration time whether the agent can operate, not at runtime when it fails. +- **Automatic coordination graph**: The engine derives the event flow graph and execution order from manifest declarations. No one writes routing rules or workflow definitions. The coordination graph updates automatically when agents are added or removed. +- **Reactive lifecycle management**: When a new resource is registered (a feature computed, a tool bound, a KB indexed), the engine re-evaluates all dormant agents and activates any whose dependencies are now fully satisfied. When a resource is removed, active agents that depended on it transition back to dormant. +- **Gap reporting**: For any dormant agent, the engine reports exactly which dependencies are unresolved and what type they are (event, feature, tool, knowledge base). This is the work backlog — auto-generated from declarations. +- **Impact analysis**: Before removing a resource, the engine can preview which active agents would break. Before adding a resource, it can preview which dormant agents would activate. +- **Local validation**: Developers validate their composition logic on their machine — no cloud platform required. The same declarations that work locally are deployed to managed platforms without modification. + +## Design + +### Building on `AgentManifest.satisfies()` + +RFC 0001 already includes a `satisfies()` method on `AgentManifest` that performs basic dependency checking for a single manifest. RFC 0002 extends this into a full composition engine that operates across multiple agents: + +| `manifest.satisfies()` (RFC 0001) | `strands.composition` (this RFC) | +|---|---| +| Single manifest, single check | Multiple manifests, continuous resolution | +| Returns bool + gaps | Manages lifecycle state transitions | +| No routing | Derives event flow graph from declarations | +| Stateless | Reactive to resource changes | +| No cross-agent awareness | Knows which agents produce what others consume | + +### New Module: `strands.composition` + +```python +# strands/composition/__init__.py + +from strands.composition.registry import CompositionRegistry +from strands.composition.resolver import BindingResolver, BindingStatus, Gap +from strands.composition.router import EventRouter, Route +from strands.composition.lifecycle import LifecycleManager, AgentState +``` + +### CompositionRegistry + +A local catalog of agents and their manifests. Provides lookup and query capabilities. + +```python +# strands/composition/registry.py + +from dataclasses import dataclass, field +from strands import Agent +from strands.agent.manifest import AgentManifest + + +class CompositionRegistry: + """Local registry of agents participating in a composition. + + Holds agent references and their manifests, enabling dependency resolution + and route computation across the set of registered agents. + """ + + def __init__(self): + self._agents: dict[str, Agent] = {} + + def register(self, agent: Agent) -> None: + """Register an agent. Requires agent to have a manifest.""" + if agent.manifest is None: + raise ValueError(f"Cannot register agent without a manifest") + self._agents[agent.manifest.name] = agent + + def unregister(self, name: str) -> None: + """Remove an agent from the registry.""" + del self._agents[name] + + def get(self, name: str) -> Agent | None: + """Get an agent by manifest name.""" + return self._agents.get(name) + + @property + def agents(self) -> list[Agent]: + """All registered agents.""" + return list(self._agents.values()) + + @property + def manifests(self) -> list[AgentManifest]: + """All registered manifests.""" + return [a.manifest for a in self._agents.values()] + + def producers_of(self, event_type: str) -> list[str]: + """Find agents that produce a given event type.""" + return [ + m.name for m in self.manifests + if event_type in m.output_contract.events_produced + ] + + def consumers_of(self, event_type: str) -> list[str]: + """Find agents that consume a given event type.""" + return [ + m.name for m in self.manifests + if event_type in m.input_contract.events_consumed + ] +``` + +### BindingResolver + +Evaluates a manifest against available resources and determines if the agent can activate. + +```python +# strands/composition/resolver.py + +from dataclasses import dataclass, field +from strands.agent.manifest import AgentManifest + + +@dataclass +class Gap: + """A single unresolved dependency.""" + type: str # "event", "feature", "tool", "knowledge_base" + name: str + description: str = "" + + +@dataclass +class BindingStatus: + """Result of evaluating a manifest against available resources.""" + agent_name: str + state: str # "active" or "dormant" + resolved_events: list[str] = field(default_factory=list) + resolved_features: list[str] = field(default_factory=list) + resolved_tools: list[str] = field(default_factory=list) + resolved_knowledge_bases: list[str] = field(default_factory=list) + gaps: list[Gap] = field(default_factory=list) + + def gap_report(self) -> str: + """Human-readable gap report.""" + if not self.gaps: + return f"{self.agent_name}: all bindings resolved (active)" + lines = [f"{self.agent_name}: {len(self.gaps)} unresolved binding(s) (dormant)"] + for gap in self.gaps: + lines.append(f" ❌ {gap.type}: {gap.name}") + return "\n".join(lines) + + +class BindingResolver: + """Evaluates agent manifests against available resources. + + Given a set of available events, features, tools, and knowledge bases, + determines whether each agent's input contract can be satisfied. + """ + + def __init__( + self, + available_events: list[str] | None = None, + available_features: list[str] | None = None, + available_tools: list[str] | None = None, + available_knowledge_bases: list[str] | None = None, + ): + self._events: set[str] = set(available_events or []) + self._features: set[str] = set(available_features or []) + self._tools: set[str] = set(available_tools or []) + self._knowledge_bases: set[str] = set(available_knowledge_bases or []) + + def add_event(self, event: str) -> None: + self._events.add(event) + + def remove_event(self, event: str) -> None: + self._events.discard(event) + + def add_feature(self, feature: str) -> None: + self._features.add(feature) + + def remove_feature(self, feature: str) -> None: + self._features.discard(feature) + + def add_tool(self, tool: str) -> None: + self._tools.add(tool) + + def remove_tool(self, tool: str) -> None: + self._tools.discard(tool) + + def add_knowledge_base(self, kb: str) -> None: + self._knowledge_bases.add(kb) + + def remove_knowledge_base(self, kb: str) -> None: + self._knowledge_bases.discard(kb) + + def evaluate(self, manifest: AgentManifest) -> BindingStatus: + """Evaluate a manifest against current available resources. + + Returns a BindingStatus indicating whether the agent can activate + (all dependencies met) or is dormant (gaps exist). + """ + gaps = [] + resolved_events = [] + resolved_features = [] + resolved_tools = [] + resolved_kbs = [] + + # Check events + for event in manifest.input_contract.events_consumed: + if event in self._events: + resolved_events.append(event) + else: + gaps.append(Gap(type="event", name=event)) + + # Check features + for feature in manifest.input_contract.features_required: + if feature in self._features: + resolved_features.append(feature) + else: + gaps.append(Gap(type="feature", name=feature)) + + # Check tools + for tool in manifest.input_contract.tool_capabilities: + if tool in self._tools: + resolved_tools.append(tool) + else: + gaps.append(Gap(type="tool", name=tool)) + + # Check knowledge bases + for kb in manifest.input_contract.knowledge_bases: + if kb in self._knowledge_bases: + resolved_kbs.append(kb) + else: + gaps.append(Gap(type="knowledge_base", name=kb)) + + return BindingStatus( + agent_name=manifest.name, + state="active" if not gaps else "dormant", + resolved_events=resolved_events, + resolved_features=resolved_features, + resolved_tools=resolved_tools, + resolved_knowledge_bases=resolved_kbs, + gaps=gaps, + ) + + def evaluate_all(self, manifests: list[AgentManifest]) -> list[BindingStatus]: + """Evaluate multiple manifests. Returns status for each.""" + return [self.evaluate(m) for m in manifests] + + def what_would_activate(self, resource_type: str, resource_name: str, manifests: list[AgentManifest]) -> list[str]: + """Preview: which dormant agents would activate if this resource were added? + + Does not mutate state. Useful for impact analysis. + """ + # Temporarily add the resource + original = getattr(self, f"_{resource_type}s").copy() + getattr(self, f"_{resource_type}s").add(resource_name) + + would_activate = [] + for manifest in manifests: + status = self.evaluate(manifest) + if status.state == "active": + # Check if it was dormant before + getattr(self, f"_{resource_type}s").discard(resource_name) + prev_status = self.evaluate(manifest) + getattr(self, f"_{resource_type}s").add(resource_name) + if prev_status.state == "dormant": + would_activate.append(manifest.name) + + # Restore + setattr(self, f"_{resource_type}s", original) + return would_activate +``` + +### EventRouter + +Computes the event flow graph from manifest declarations. + +```python +# strands/composition/router.py + +from dataclasses import dataclass +from strands.composition.registry import CompositionRegistry + + +@dataclass(frozen=True) +class Route: + """A resolved event route: source agent produces event, target agents consume it.""" + event: str + source: str # agent name that produces this event + targets: list[str] # agent names that consume this event + + def __eq__(self, other): + if not isinstance(other, Route): + return False + return (self.event == other.event and + self.source == other.source and + set(self.targets) == set(other.targets)) + + def __hash__(self): + return hash((self.event, self.source, frozenset(self.targets))) + + +class EventRouter: + """Computes event routing from manifest declarations. + + Given a registry of agents, determines how events flow between them + based on output_contract.events_produced and input_contract.events_consumed. + No manual wiring needed — routes emerge from contracts. + """ + + def __init__(self, registry: CompositionRegistry): + self._registry = registry + + def resolve_routes(self) -> list[Route]: + """Compute all event routes across registered agents. + + For each event type produced by any agent, find all agents that + consume it and create a route. + """ + routes = [] + + # Collect all produced event types and their producers + produced: dict[str, list[str]] = {} + for manifest in self._registry.manifests: + for event in manifest.output_contract.events_produced: + produced.setdefault(event, []).append(manifest.name) + + # For each produced event, find consumers + for event_type, producers in produced.items(): + consumers = self._registry.consumers_of(event_type) + if consumers: + for producer in producers: + # Don't route to self + targets = [c for c in consumers if c != producer] + if targets: + routes.append(Route( + event=event_type, + source=producer, + targets=targets, + )) + + return routes + + def routes_from(self, agent_name: str) -> list[Route]: + """Get all routes originating from a specific agent.""" + all_routes = self.resolve_routes() + return [r for r in all_routes if r.source == agent_name] + + def routes_to(self, agent_name: str) -> list[Route]: + """Get all routes targeting a specific agent.""" + all_routes = self.resolve_routes() + return [r for r in all_routes if agent_name in r.targets] + + def dependency_order(self) -> list[list[str]]: + """Topological sort of agents by event dependencies. + + Returns layers: agents in layer 0 have no event dependencies, + agents in layer 1 depend only on layer 0 outputs, etc. + Useful for understanding execution order. + """ + manifests = {m.name: m for m in self._registry.manifests} + + # Build dependency graph + deps: dict[str, set[str]] = {name: set() for name in manifests} + for manifest in manifests.values(): + for event in manifest.input_contract.events_consumed: + producers = self._registry.producers_of(event) + for p in producers: + if p != manifest.name: + deps[manifest.name].add(p) + + # Topological sort (Kahn's algorithm) + layers = [] + remaining = dict(deps) + + while remaining: + # Find agents with no unresolved dependencies + layer = [n for n, d in remaining.items() if not d] + if not layer: + # Cycle detected — return what we have + layer = list(remaining.keys()) + layers.append(layer) + break + layers.append(layer) + # Remove resolved agents from dependencies + for n in layer: + del remaining[n] + for d in remaining.values(): + d -= set(layer) + + return layers + + def visualize(self) -> str: + """ASCII visualization of the event flow graph.""" + routes = self.resolve_routes() + if not routes: + return "(no routes)" + + lines = [] + for route in routes: + targets_str = ", ".join(route.targets) + lines.append(f" {route.source} ──[{route.event}]──▶ {targets_str}") + + return "Event Flow:\n" + "\n".join(sorted(lines)) +``` + +### LifecycleManager + +Manages dormant/active transitions reactively as resources change. + +```python +# strands/composition/lifecycle.py + +from dataclasses import dataclass, field +from datetime import datetime +from strands.composition.registry import CompositionRegistry +from strands.composition.resolver import BindingResolver, BindingStatus + + +@dataclass +class StateTransition: + """Record of an agent state change.""" + agent_name: str + from_state: str + to_state: str + timestamp: datetime + reason: str # what resource triggered the transition + + +@dataclass +class AgentState: + """Current state of an agent in the composition.""" + name: str + state: str # "active" or "dormant" + binding_status: BindingStatus + + +class LifecycleManager: + """Manages agent lifecycle based on resource availability. + + When resources are added or removed, re-evaluates all agents and + transitions them between dormant and active states. + """ + + def __init__(self, registry: CompositionRegistry, resolver: BindingResolver): + self._registry = registry + self._resolver = resolver + self._states: dict[str, str] = {} # agent_name → state + self._history: list[StateTransition] = [] + + # Initial evaluation + self._evaluate_all() + + def _evaluate_all(self) -> list[StateTransition]: + """Re-evaluate all agents and return any state transitions.""" + transitions = [] + for manifest in self._registry.manifests: + status = self._resolver.evaluate(manifest) + old_state = self._states.get(manifest.name, "dormant") + new_state = status.state + + if old_state != new_state: + transition = StateTransition( + agent_name=manifest.name, + from_state=old_state, + to_state=new_state, + timestamp=datetime.now(), + reason="re-evaluation", + ) + transitions.append(transition) + self._history.append(transition) + + self._states[manifest.name] = new_state + + return transitions + + def add_resource(self, resource_type: str, name: str) -> list[StateTransition]: + """Add a resource and return any resulting state transitions. + + resource_type: "event", "feature", "tool", or "knowledge_base" + """ + add_method = getattr(self._resolver, f"add_{resource_type}") + add_method(name) + + transitions = self._evaluate_all() + for t in transitions: + t.reason = f"{resource_type} '{name}' added" + + return transitions + + def remove_resource(self, resource_type: str, name: str) -> list[StateTransition]: + """Remove a resource and return any resulting state transitions.""" + remove_method = getattr(self._resolver, f"remove_{resource_type}") + remove_method(name) + + transitions = self._evaluate_all() + for t in transitions: + t.reason = f"{resource_type} '{name}' removed" + + return transitions + + def get_state(self, agent_name: str) -> AgentState: + """Get current state of an agent.""" + manifest = self._registry.get(agent_name).manifest + status = self._resolver.evaluate(manifest) + return AgentState(name=agent_name, state=status.state, binding_status=status) + + @property + def active_agents(self) -> list[str]: + return [name for name, state in self._states.items() if state == "active"] + + @property + def dormant_agents(self) -> list[str]: + return [name for name, state in self._states.items() if state == "dormant"] + + @property + def history(self) -> list[StateTransition]: + return list(self._history) +``` + +## Usage Examples + +### Example 1: Validate a composition before deployment + +```python +from strands import Agent +from strands.agent.manifest import AgentManifest, InputContract, OutputContract, Trigger +from strands.composition import CompositionRegistry, BindingResolver, EventRouter + +# Define agents +rca_agent = Agent( + manifest=AgentManifest( + name="rca_agent", + version="1.0.0", + input_contract=InputContract(events_consumed=["AlarmEvent"]), + output_contract=OutputContract(events_produced=["FaultEvent"]), + trigger=Trigger(type="event", condition="AlarmEvent"), + ), + ... +) + +impact_agent = Agent( + manifest=AgentManifest( + name="impact_agent", + version="1.0.0", + input_contract=InputContract( + events_consumed=["FaultEvent"], + features_required=["service_mapping"], + ), + output_contract=OutputContract(events_produced=["ImpactEvent"]), + trigger=Trigger(type="event", condition="FaultEvent"), + ), + ... +) + +# Check composition +registry = CompositionRegistry() +registry.register(rca_agent) +registry.register(impact_agent) + +resolver = BindingResolver( + available_events=["AlarmEvent"], + available_features=[], # service_mapping not yet available + available_tools=[], +) + +for manifest in registry.manifests: + status = resolver.evaluate(manifest) + print(status.gap_report()) + +# Output: +# rca_agent: all bindings resolved (active) +# impact_agent: 1 unresolved binding(s) (dormant) +# ❌ feature: service_mapping + +# Visualize the routes (only active agents would execute) +router = EventRouter(registry) +print(router.visualize()) + +# Output: +# Event Flow: +# rca_agent ──[FaultEvent]──▶ impact_agent +``` + +### Example 2: Simulate resource arrival + +```python +from strands.composition import LifecycleManager + +manager = LifecycleManager(registry, resolver) + +print(manager.active_agents) # ["rca_agent"] +print(manager.dormant_agents) # ["impact_agent"] + +# Feature arrives +transitions = manager.add_resource("feature", "service_mapping") + +for t in transitions: + print(f"{t.agent_name}: {t.from_state} → {t.to_state} ({t.reason})") + +# Output: +# impact_agent: dormant → active (feature 'service_mapping' added) + +print(manager.active_agents) # ["rca_agent", "impact_agent"] +``` + +### Example 3: Impact analysis + +```python +# What breaks if I remove this feature? +transitions = manager.remove_resource("feature", "service_mapping") + +for t in transitions: + print(f"{t.agent_name}: {t.from_state} → {t.to_state} ({t.reason})") + +# Output: +# impact_agent: active → dormant (feature 'service_mapping' removed) +``` + +### Example 4: Execution order + +```python +router = EventRouter(registry) +layers = router.dependency_order() + +# layers = [ +# ["rca_agent"], # layer 0: no event dependencies (triggered by external AlarmEvent) +# ["impact_agent"], # layer 1: depends on rca_agent's FaultEvent +# ] +``` + +## When to Use + +Use `strands.composition` when: +- You have multiple agents with declared dependencies (manifests) +- You want to validate that a multi-agent system's full dependency surface is satisfiable — not just events, but features, tools, and knowledge bases +- You want the coordination structure derived from declarations, not written as imperative code +- You need lifecycle management (dormant until all dependencies are met, not just event subscriptions) +- You want to understand the impact of adding or removing any resource type +- You want to test composition logic locally before deploying to a managed platform +- You want plug-and-play extensibility: new agents register and activate without modifying existing agents + +Do NOT use `strands.composition` when: +- You have a single agent with no dependencies on other agents +- Your agents communicate through direct function calls (not declarative contracts) +- You prefer explicit imperative orchestration (Step Functions, DAG definitions) and accept the maintenance cost of updating the DAG when agents change +- Your agents have no formal input/output contracts + +## Relationship to Existing Multi-Agent Patterns + +Strands already provides two multi-agent coordination patterns: + +- **Graph** (`strands.multiagent.graph`): Deterministic DAG execution. The developer explicitly defines nodes, edges, and entry points. The structure is imperative — you write it, you maintain it. Best for: fixed workflows where the execution order is known at design time. + +- **Swarm** (`strands.multiagent.swarm`): Self-organizing agents with shared context. Agents hand off to each other at runtime via a `handoff_to_agent` tool. The structure is emergent at runtime — agents decide who to call based on the task. Best for: collaborative problem-solving where the right agent depends on the conversation. + +**Composition** (this RFC) adds a third pattern: + +- **Composition** (`strands.composition`): Declarative coordination. Agents declare what they need and produce via manifests. The structure is derived from declarations — no one writes it, no one maintains it. Adding an agent is registering a manifest. Best for: systems where agents are developed independently, deployed incrementally, and must compose without modifying each other. + +| | Graph | Swarm | Composition | +|---|---|---|---| +| Topology defined by | Developer (imperative) | Agents at runtime | Manifest declarations | +| Adding a new agent | Edit graph code | Add to swarm node list | Register manifest | +| Dependency validation | None (runtime failure) | None (runtime failure) | At registration time | +| Execution model | Batch (run full DAG) | Interactive (handoff loop) | Event-driven (trigger on input) | +| Best for | Fixed pipelines | Collaborative tasks | Plug-and-play extensibility | + +These patterns are complementary. A Graph node could be an agent with a manifest. A Swarm could use manifests for discovery. Composition provides the dependency resolution layer that neither Graph nor Swarm has today. + +--- + +The composition module is pure resolution logic — it computes binding status, routes, and lifecycle transitions. It does not execute agents, deliver events, or persist state. Those responsibilities belong to the execution environment: + +- **Local development**: Resolution results are informational. Developer uses them to validate and understand the composition. +- **Test harness**: A test runner reads routes and invokes agents in dependency order. +- **Managed platform (e.g., AgentCore)**: The platform reads the same manifests, uses the same declarative resolution logic, and implements the coordination via managed services (EventBridge for routing, Registry for lifecycle, Policy for contract enforcement). + +The composition module defines the **declarative protocol**. Managed platforms provide the **production execution**. + +## Backward Compatibility + +- New module, no changes to existing SDK APIs +- Requires RFC 0001 (AgentManifest) to be merged first +- No new required dependencies (pure Python, standard library only) +- Agents without manifests cannot participate in composition (by design) +- If this is considered too opinionated for core, it can live as a standalone package (`strands-agents/composition`) with no changes to the implementation + +## Alternatives Considered + +1. **Orchestrator pattern (imperative DAG)**: Rejected. Requires someone to write and maintain the workflow definition. Adding an agent means modifying the orchestrator. The declarative approach is additive — new agents plug in without modifying existing definitions. + +2. **Pub/Sub only (no dependency validation)**: Rejected. Pub/Sub solves routing but not readiness. An agent can subscribe to a topic but still fail because a feature isn't available or a tool isn't bound. Declarative composition validates the full dependency surface, not just event subscriptions. + +3. **Blackboard pattern (shared state)**: Rejected. No explicit contracts, no dependency validation, no lifecycle management. Agents silently fail when expected state isn't present. + +4. **Convention-based routing (match by name)**: Rejected. Fragile. Explicit declarative contracts are more reliable than naming conventions. + +5. **Build into Agent class directly**: Rejected. Composition is a concern of the system, not the individual agent. Keeping it in a separate module maintains separation of concerns — agents declare, the composition engine resolves. + +## Implementation Plan + +1. Merge RFC 0001 (AgentManifest) — prerequisite +2. Add `src/strands/composition/__init__.py` with module exports +3. Add `src/strands/composition/registry.py` — CompositionRegistry +4. Add `src/strands/composition/resolver.py` — BindingResolver, BindingStatus, Gap +5. Add `src/strands/composition/router.py` — EventRouter, Route +6. Add `src/strands/composition/lifecycle.py` — LifecycleManager, AgentState, StateTransition +7. Add unit tests for all components +8. Add integration test: multi-agent composition scenario end-to-end (in-process) +9. Add documentation page with examples +10. Export from `strands.composition` From 6e2afdf932f1a45334fdb9de1cc78a44247a8251 Mon Sep 17 00:00:00 2001 From: imengby Date: Sun, 10 May 2026 11:55:39 +0100 Subject: [PATCH 3/3] =?UTF-8?q?feat:=20add=20strands.composition=20module?= =?UTF-8?q?=20=E2=80=94=20implementation=20+=20tests?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Implements the composition engine proposed in RFC 0002: - CompositionRegistry: local catalog of agents and manifests - BindingResolver: evaluates full dependency surface, reports gaps - EventRouter: derives event flow graph from declarations - LifecycleManager: reactive dormant/active transitions All 31 tests passing. Pure Python, no external dependencies. --- src/strands/composition/__init__.py | 37 ++ src/strands/composition/lifecycle.py | 196 ++++++++ src/strands/composition/registry.py | 166 +++++++ src/strands/composition/resolver.py | 249 +++++++++++ src/strands/composition/router.py | 173 ++++++++ tests/strands/composition/__init__.py | 1 + tests/strands/composition/test_composition.py | 418 ++++++++++++++++++ 7 files changed, 1240 insertions(+) create mode 100644 src/strands/composition/__init__.py create mode 100644 src/strands/composition/lifecycle.py create mode 100644 src/strands/composition/registry.py create mode 100644 src/strands/composition/resolver.py create mode 100644 src/strands/composition/router.py create mode 100644 tests/strands/composition/__init__.py create mode 100644 tests/strands/composition/test_composition.py diff --git a/src/strands/composition/__init__.py b/src/strands/composition/__init__.py new file mode 100644 index 000000000..3fa399944 --- /dev/null +++ b/src/strands/composition/__init__.py @@ -0,0 +1,37 @@ +"""Agent Composition — Declarative Multi-Agent Coordination. + +This module provides declarative composition for multi-agent systems. Agents declare +what they need and what they produce via manifests. The composition engine resolves +dependencies, derives event routing, and manages agent lifecycle automatically. + +Key components: + CompositionRegistry: Local catalog of agents and their manifests. + BindingResolver: Evaluates manifests against available resources. + EventRouter: Derives event flow graph from manifest declarations. + LifecycleManager: Manages dormant/active transitions reactively. + +Example: + >>> from strands.composition import CompositionRegistry, BindingResolver, EventRouter + >>> registry = CompositionRegistry() + >>> registry.register(my_agent) + >>> resolver = BindingResolver(available_events=["DataEvent"], available_tools=["query"]) + >>> status = resolver.evaluate(my_agent.manifest) + >>> print(status.gap_report()) +""" + +from strands.composition.lifecycle import AgentState, LifecycleManager, StateTransition +from strands.composition.registry import CompositionRegistry +from strands.composition.resolver import BindingResolver, BindingStatus, Gap +from strands.composition.router import EventRouter, Route + +__all__ = [ + "AgentState", + "BindingResolver", + "BindingStatus", + "CompositionRegistry", + "EventRouter", + "Gap", + "LifecycleManager", + "Route", + "StateTransition", +] diff --git a/src/strands/composition/lifecycle.py b/src/strands/composition/lifecycle.py new file mode 100644 index 000000000..fa2444cd9 --- /dev/null +++ b/src/strands/composition/lifecycle.py @@ -0,0 +1,196 @@ +"""Lifecycle Manager — Manages dormant/active transitions reactively. + +When resources are added or removed, re-evaluates all agents and +transitions them between dormant and active states. +""" + +import logging +from dataclasses import dataclass, field +from datetime import datetime + +from strands.composition.registry import CompositionRegistry +from strands.composition.resolver import BindingResolver, BindingStatus + +logger = logging.getLogger(__name__) + + +@dataclass +class StateTransition: + """Record of an agent state change. + + Attributes: + agent_name: Name of the agent that transitioned. + from_state: Previous state ("active" or "dormant"). + to_state: New state ("active" or "dormant"). + timestamp: When the transition occurred. + reason: What triggered the transition. + """ + + agent_name: str + from_state: str + to_state: str + timestamp: datetime + reason: str + + +@dataclass +class AgentState: + """Current state of an agent in the composition. + + Attributes: + name: Agent name. + state: Current state ("active" or "dormant"). + binding_status: Full binding evaluation result. + """ + + name: str + state: str + binding_status: BindingStatus + + +class LifecycleManager: + """Manages agent lifecycle based on resource availability. + + When resources are added or removed, re-evaluates all agents and + transitions them between dormant and active states. Maintains a + history of all transitions for audit. + + Example: + >>> manager = LifecycleManager(registry, resolver) + >>> print(manager.active_agents) + ['rca_agent'] + >>> transitions = manager.add_resource("feature", "service_mapping") + >>> print(transitions[0].agent_name, transitions[0].to_state) + impact_agent active + """ + + def __init__(self, registry: CompositionRegistry, resolver: BindingResolver) -> None: + """Initialize with a registry and resolver. + + Performs initial evaluation of all registered agents. + + Args: + registry: The composition registry containing agents. + resolver: The binding resolver with current available resources. + """ + self._registry = registry + self._resolver = resolver + self._states: dict[str, str] = {} + self._history: list[StateTransition] = [] + + self._evaluate_all() + + def _evaluate_all(self) -> list[StateTransition]: + """Re-evaluate all agents and return any state transitions.""" + transitions: list[StateTransition] = [] + for manifest in self._registry.manifests: + status = self._resolver.evaluate(manifest) + old_state = self._states.get(manifest.name, "dormant") + new_state = status.state + + if old_state != new_state: + transition = StateTransition( + agent_name=manifest.name, + from_state=old_state, + to_state=new_state, + timestamp=datetime.now(), + reason="re-evaluation", + ) + transitions.append(transition) + self._history.append(transition) + logger.info( + "agent=%s | %s -> %s (%s)", + manifest.name, + old_state, + new_state, + transition.reason, + ) + + self._states[manifest.name] = new_state + + return transitions + + def add_resource(self, resource_type: str, name: str) -> list[StateTransition]: + """Add a resource and return any resulting state transitions. + + Args: + resource_type: One of "event", "feature", "tool", "knowledge_base". + name: Name of the resource being added. + + Returns: + List of state transitions triggered by this addition. + """ + add_method = getattr(self._resolver, f"add_{resource_type}") + add_method(name) + + transitions = self._evaluate_all() + for t in transitions: + t.reason = f"{resource_type} '{name}' added" + + return transitions + + def remove_resource(self, resource_type: str, name: str) -> list[StateTransition]: + """Remove a resource and return any resulting state transitions. + + Args: + resource_type: One of "event", "feature", "tool", "knowledge_base". + name: Name of the resource being removed. + + Returns: + List of state transitions triggered by this removal. + """ + remove_method = getattr(self._resolver, f"remove_{resource_type}") + remove_method(name) + + transitions = self._evaluate_all() + for t in transitions: + t.reason = f"{resource_type} '{name}' removed" + + return transitions + + def get_state(self, agent_name: str) -> AgentState: + """Get current state of an agent. + + Args: + agent_name: The agent name to query. + + Returns: + AgentState with current state and full binding status. + + Raises: + KeyError: If the agent is not registered. + """ + manifest = self._registry.get_manifest(agent_name) + if manifest is None: + raise KeyError(f"Agent '{agent_name}' not found in registry") + status = self._resolver.evaluate(manifest) + return AgentState(name=agent_name, state=status.state, binding_status=status) + + @property + def active_agents(self) -> list[str]: + """List of agent names currently in active state.""" + return [name for name, state in self._states.items() if state == "active"] + + @property + def dormant_agents(self) -> list[str]: + """List of agent names currently in dormant state.""" + return [name for name, state in self._states.items() if state == "dormant"] + + @property + def history(self) -> list[StateTransition]: + """Full history of state transitions.""" + return list(self._history) + + def summary(self) -> str: + """Human-readable summary of current composition state. + + Returns: + Formatted string showing active/dormant agents and their gaps. + """ + lines = [f"Active ({len(self.active_agents)}): {', '.join(self.active_agents) or '(none)'}"] + lines.append(f"Dormant ({len(self.dormant_agents)}):") + for name in self.dormant_agents: + state = self.get_state(name) + gaps = [f"{g.type}:{g.name}" for g in state.binding_status.gaps] + lines.append(f" {name}: needs [{', '.join(gaps)}]") + return "\n".join(lines) diff --git a/src/strands/composition/registry.py b/src/strands/composition/registry.py new file mode 100644 index 000000000..b1c7f3957 --- /dev/null +++ b/src/strands/composition/registry.py @@ -0,0 +1,166 @@ +"""Composition Registry — Local catalog of agents participating in a composition. + +Provides lookup and query capabilities across registered agents and their manifests. +""" + +import logging +from typing import Optional + +from strands.agent.manifest import AgentManifest + +logger = logging.getLogger(__name__) + + +class CompositionRegistry: + """Local registry of agents participating in a composition. + + Holds agent references and their manifests, enabling dependency resolution + and route computation across the set of registered agents. + + Example: + >>> from strands.composition import CompositionRegistry + >>> registry = CompositionRegistry() + >>> registry.register(my_agent) + >>> registry.producers_of("FaultEvent") + ['rca_agent'] + """ + + def __init__(self) -> None: + """Initialize an empty registry.""" + self._agents: dict[str, object] = {} + self._manifests: dict[str, AgentManifest] = {} + + def register(self, agent: object) -> None: + """Register an agent. Requires agent to have a manifest attribute. + + Args: + agent: An Agent instance with a manifest attribute. + + Raises: + ValueError: If the agent has no manifest. + """ + manifest = getattr(agent, "manifest", None) + if manifest is None: + raise ValueError("Cannot register agent without a manifest") + if not isinstance(manifest, AgentManifest): + raise ValueError("Agent manifest must be an AgentManifest instance") + + self._agents[manifest.name] = agent + self._manifests[manifest.name] = manifest + logger.debug("agent=%s | registered in composition registry", manifest.name) + + def register_manifest(self, manifest: AgentManifest) -> None: + """Register a manifest without an agent instance. + + Useful for validating compositions before agents are instantiated. + + Args: + manifest: An AgentManifest instance. + """ + self._manifests[manifest.name] = manifest + logger.debug("manifest=%s | registered in composition registry (no agent instance)", manifest.name) + + def unregister(self, name: str) -> None: + """Remove an agent from the registry. + + Args: + name: The manifest name of the agent to remove. + + Raises: + KeyError: If the agent is not registered. + """ + if name not in self._manifests: + raise KeyError(f"Agent '{name}' not found in registry") + self._manifests.pop(name) + self._agents.pop(name, None) + logger.debug("agent=%s | unregistered from composition registry", name) + + def get(self, name: str) -> Optional[object]: + """Get an agent by manifest name. + + Args: + name: The manifest name. + + Returns: + The agent instance, or None if not found. + """ + return self._agents.get(name) + + def get_manifest(self, name: str) -> Optional[AgentManifest]: + """Get a manifest by name. + + Args: + name: The manifest name. + + Returns: + The AgentManifest instance, or None if not found. + """ + return self._manifests.get(name) + + @property + def agents(self) -> list[object]: + """All registered agent instances.""" + return list(self._agents.values()) + + @property + def manifests(self) -> list[AgentManifest]: + """All registered manifests.""" + return list(self._manifests.values()) + + @property + def names(self) -> list[str]: + """All registered agent names.""" + return list(self._manifests.keys()) + + def __len__(self) -> int: + """Number of registered manifests.""" + return len(self._manifests) + + def __contains__(self, name: str) -> bool: + """Check if an agent name is registered.""" + return name in self._manifests + + def producers_of(self, event_type: str) -> list[str]: + """Find agents that produce a given event type. + + Args: + event_type: The event type to search for. + + Returns: + List of agent names that produce this event type. + """ + return [ + m.name + for m in self._manifests.values() + if event_type in m.output_contract.events_produced + ] + + def consumers_of(self, event_type: str) -> list[str]: + """Find agents that consume a given event type. + + Args: + event_type: The event type to search for. + + Returns: + List of agent names that consume this event type. + """ + return [ + m.name + for m in self._manifests.values() + if event_type in m.input_contract.events_consumed + ] + + def providers_of_feature(self, feature_name: str) -> list[str]: + """Find agents that produce a given feature. + + Args: + feature_name: The feature name to search for. + + Returns: + List of agent names that produce this feature. + """ + return [ + m.name + for m in self._manifests.values() + if feature_name in m.output_contract.features_produced + ] diff --git a/src/strands/composition/resolver.py b/src/strands/composition/resolver.py new file mode 100644 index 000000000..55cf081fe --- /dev/null +++ b/src/strands/composition/resolver.py @@ -0,0 +1,249 @@ +"""Binding Resolver — Evaluates agent manifests against available resources. + +Determines whether each agent's input contract can be satisfied given the +currently available events, features, tools, and knowledge bases. +""" + +import logging +from dataclasses import dataclass, field + +from strands.agent.manifest import AgentManifest + +logger = logging.getLogger(__name__) + + +@dataclass +class Gap: + """A single unresolved dependency. + + Attributes: + type: The resource type ("event", "feature", "tool", "knowledge_base"). + name: The name of the missing resource. + description: Optional human-readable description. + """ + + type: str + name: str + description: str = "" + + +@dataclass +class BindingStatus: + """Result of evaluating a manifest against available resources. + + Attributes: + agent_name: Name of the evaluated agent. + state: "active" if all dependencies met, "dormant" if gaps exist. + resolved_events: Events that are available. + resolved_features: Features that are available. + resolved_tools: Tools that are available. + resolved_knowledge_bases: Knowledge bases that are available. + gaps: List of unresolved dependencies. + """ + + agent_name: str + state: str + resolved_events: list[str] = field(default_factory=list) + resolved_features: list[str] = field(default_factory=list) + resolved_tools: list[str] = field(default_factory=list) + resolved_knowledge_bases: list[str] = field(default_factory=list) + gaps: list[Gap] = field(default_factory=list) + + def gap_report(self) -> str: + """Human-readable gap report. + + Returns: + Formatted string showing agent state and any unresolved dependencies. + """ + if not self.gaps: + return f"{self.agent_name}: all bindings resolved (active)" + lines = [f"{self.agent_name}: {len(self.gaps)} unresolved binding(s) (dormant)"] + for gap in self.gaps: + lines.append(f" - {gap.type}: {gap.name}") + return "\n".join(lines) + + +class BindingResolver: + """Evaluates agent manifests against available resources. + + Given a set of available events, features, tools, and knowledge bases, + determines whether each agent's input contract can be satisfied. + + Example: + >>> resolver = BindingResolver( + ... available_events=["AlarmEvent"], + ... available_features=["device_health"], + ... available_tools=["query_tool"], + ... ) + >>> status = resolver.evaluate(agent.manifest) + >>> print(status.state) # "active" or "dormant" + """ + + def __init__( + self, + available_events: list[str] | None = None, + available_features: list[str] | None = None, + available_tools: list[str] | None = None, + available_knowledge_bases: list[str] | None = None, + ) -> None: + """Initialize with available resources. + + Args: + available_events: Event types currently available. + available_features: Feature names currently available. + available_tools: Tool capability names currently registered. + available_knowledge_bases: Knowledge base names currently available. + """ + self._events: set[str] = set(available_events or []) + self._features: set[str] = set(available_features or []) + self._tools: set[str] = set(available_tools or []) + self._knowledge_bases: set[str] = set(available_knowledge_bases or []) + + @property + def available_events(self) -> set[str]: + """Currently available events.""" + return self._events.copy() + + @property + def available_features(self) -> set[str]: + """Currently available features.""" + return self._features.copy() + + @property + def available_tools(self) -> set[str]: + """Currently available tools.""" + return self._tools.copy() + + @property + def available_knowledge_bases(self) -> set[str]: + """Currently available knowledge bases.""" + return self._knowledge_bases.copy() + + def add_event(self, event: str) -> None: + """Register a new available event.""" + self._events.add(event) + + def remove_event(self, event: str) -> None: + """Remove an available event.""" + self._events.discard(event) + + def add_feature(self, feature: str) -> None: + """Register a new available feature.""" + self._features.add(feature) + + def remove_feature(self, feature: str) -> None: + """Remove an available feature.""" + self._features.discard(feature) + + def add_tool(self, tool: str) -> None: + """Register a new available tool.""" + self._tools.add(tool) + + def remove_tool(self, tool: str) -> None: + """Remove an available tool.""" + self._tools.discard(tool) + + def add_knowledge_base(self, kb: str) -> None: + """Register a new available knowledge base.""" + self._knowledge_bases.add(kb) + + def remove_knowledge_base(self, kb: str) -> None: + """Remove an available knowledge base.""" + self._knowledge_bases.discard(kb) + + def evaluate(self, manifest: AgentManifest) -> BindingStatus: + """Evaluate a manifest against current available resources. + + Args: + manifest: The agent manifest to evaluate. + + Returns: + BindingStatus indicating whether the agent can activate + (all dependencies met) or is dormant (gaps exist). + """ + gaps: list[Gap] = [] + resolved_events: list[str] = [] + resolved_features: list[str] = [] + resolved_tools: list[str] = [] + resolved_kbs: list[str] = [] + + for event in manifest.input_contract.events_consumed: + if event in self._events: + resolved_events.append(event) + else: + gaps.append(Gap(type="event", name=event)) + + for feature in manifest.input_contract.features_required: + if feature in self._features: + resolved_features.append(feature) + else: + gaps.append(Gap(type="feature", name=feature)) + + for tool in manifest.input_contract.tool_capabilities: + if tool in self._tools: + resolved_tools.append(tool) + else: + gaps.append(Gap(type="tool", name=tool)) + + for kb in manifest.input_contract.knowledge_bases: + if kb in self._knowledge_bases: + resolved_kbs.append(kb) + else: + gaps.append(Gap(type="knowledge_base", name=kb)) + + state = "active" if not gaps else "dormant" + + return BindingStatus( + agent_name=manifest.name, + state=state, + resolved_events=resolved_events, + resolved_features=resolved_features, + resolved_tools=resolved_tools, + resolved_knowledge_bases=resolved_kbs, + gaps=gaps, + ) + + def evaluate_all(self, manifests: list[AgentManifest]) -> list[BindingStatus]: + """Evaluate multiple manifests. + + Args: + manifests: List of manifests to evaluate. + + Returns: + List of BindingStatus, one per manifest. + """ + return [self.evaluate(m) for m in manifests] + + def what_would_activate( + self, resource_type: str, resource_name: str, manifests: list[AgentManifest] + ) -> list[str]: + """Preview which dormant agents would activate if a resource were added. + + Does not mutate state. + + Args: + resource_type: One of "event", "feature", "tool", "knowledge_base". + resource_name: Name of the resource to simulate adding. + manifests: List of manifests to check. + + Returns: + List of agent names that would transition from dormant to active. + """ + attr_name = f"_{resource_type}s" + original = getattr(self, attr_name).copy() + + getattr(self, attr_name).add(resource_name) + + would_activate = [] + for manifest in manifests: + status = self.evaluate(manifest) + if status.state == "active": + # Check if it was dormant before + getattr(self, attr_name).discard(resource_name) + prev_status = self.evaluate(manifest) + getattr(self, attr_name).add(resource_name) + if prev_status.state == "dormant": + would_activate.append(manifest.name) + + setattr(self, attr_name, original) + return would_activate diff --git a/src/strands/composition/router.py b/src/strands/composition/router.py new file mode 100644 index 000000000..65398c579 --- /dev/null +++ b/src/strands/composition/router.py @@ -0,0 +1,173 @@ +"""Event Router — Derives event flow graph from manifest declarations. + +Computes how events flow between agents based on their declared +output_contract.events_produced and input_contract.events_consumed. +""" + +import logging +from dataclasses import dataclass + +from strands.composition.registry import CompositionRegistry + +logger = logging.getLogger(__name__) + + +@dataclass +class Route: + """A resolved event route: source agent produces event, target agents consume it. + + Attributes: + event: The event type being routed. + source: Agent name that produces this event. + targets: Agent names that consume this event. + """ + + event: str + source: str + targets: list[str] + + def __eq__(self, other: object) -> bool: + """Equality based on event, source, and targets (order-independent).""" + if not isinstance(other, Route): + return False + return ( + self.event == other.event + and self.source == other.source + and set(self.targets) == set(other.targets) + ) + + def __hash__(self) -> int: + """Hash based on event, source, and targets.""" + return hash((self.event, self.source, frozenset(self.targets))) + + def __repr__(self) -> str: + """Readable representation.""" + targets_str = ", ".join(self.targets) + return f"Route({self.source} --[{self.event}]--> [{targets_str}])" + + +class EventRouter: + """Computes event routing from manifest declarations. + + Given a registry of agents, determines how events flow between them + based on output_contract.events_produced and input_contract.events_consumed. + No manual wiring needed — routes emerge from contracts. + + Example: + >>> router = EventRouter(registry) + >>> routes = router.resolve_routes() + >>> print(router.visualize()) + Event Flow: + rca_agent --[FaultEvent]--> impact_agent + """ + + def __init__(self, registry: CompositionRegistry) -> None: + """Initialize with a composition registry. + + Args: + registry: The registry containing agents and their manifests. + """ + self._registry = registry + + def resolve_routes(self) -> list[Route]: + """Compute all event routes across registered agents. + + For each event type produced by any agent, find all agents that + consume it and create a route. + + Returns: + List of Route objects representing the event flow graph. + """ + routes: list[Route] = [] + + produced: dict[str, list[str]] = {} + for manifest in self._registry.manifests: + for event in manifest.output_contract.events_produced: + produced.setdefault(event, []).append(manifest.name) + + for event_type, producers in produced.items(): + consumers = self._registry.consumers_of(event_type) + if consumers: + for producer in producers: + targets = [c for c in consumers if c != producer] + if targets: + routes.append(Route(event=event_type, source=producer, targets=targets)) + + return routes + + def routes_from(self, agent_name: str) -> list[Route]: + """Get all routes originating from a specific agent. + + Args: + agent_name: The agent name to filter by. + + Returns: + List of routes where this agent is the source. + """ + return [r for r in self.resolve_routes() if r.source == agent_name] + + def routes_to(self, agent_name: str) -> list[Route]: + """Get all routes targeting a specific agent. + + Args: + agent_name: The agent name to filter by. + + Returns: + List of routes where this agent is a target. + """ + return [r for r in self.resolve_routes() if agent_name in r.targets] + + def dependency_order(self) -> list[list[str]]: + """Topological sort of agents by event dependencies. + + Returns layers where agents in layer 0 have no event dependencies, + agents in layer 1 depend only on layer 0 outputs, etc. + + Returns: + List of layers, each layer is a list of agent names that can + execute in parallel. + """ + manifests = {m.name: m for m in self._registry.manifests} + + deps: dict[str, set[str]] = {name: set() for name in manifests} + for manifest in manifests.values(): + for event in manifest.input_contract.events_consumed: + producers = self._registry.producers_of(event) + for p in producers: + if p != manifest.name: + deps[manifest.name].add(p) + + layers: list[list[str]] = [] + remaining = {k: set(v) for k, v in deps.items()} + + while remaining: + layer = [n for n, d in remaining.items() if not d] + if not layer: + # Cycle detected — include remaining as final layer + layer = list(remaining.keys()) + layers.append(layer) + break + layers.append(layer) + for n in layer: + del remaining[n] + for d in remaining.values(): + d -= set(layer) + + return layers + + def visualize(self) -> str: + """ASCII visualization of the event flow graph. + + Returns: + Formatted string showing all event routes. + """ + routes = self.resolve_routes() + if not routes: + return "(no routes)" + + lines: list[str] = [] + for route in routes: + targets_str = ", ".join(route.targets) + lines.append(f" {route.source} --[{route.event}]--> {targets_str}") + + return "Event Flow:\n" + "\n".join(sorted(lines)) diff --git a/tests/strands/composition/__init__.py b/tests/strands/composition/__init__.py new file mode 100644 index 000000000..8390f458f --- /dev/null +++ b/tests/strands/composition/__init__.py @@ -0,0 +1 @@ +# Composition tests diff --git a/tests/strands/composition/test_composition.py b/tests/strands/composition/test_composition.py new file mode 100644 index 000000000..83e369719 --- /dev/null +++ b/tests/strands/composition/test_composition.py @@ -0,0 +1,418 @@ +"""Tests for strands.composition module.""" + +import pytest + +from strands.agent.manifest import AgentManifest, InputContract, OutputContract, Trigger +from strands.composition import ( + BindingResolver, + CompositionRegistry, + EventRouter, + LifecycleManager, + Route, +) + + +# --- Fixtures --- + + +def make_manifest(name, events_consumed=None, events_produced=None, + features_required=None, tool_capabilities=None, + knowledge_bases=None, trigger_type="event", trigger_condition=None): + """Helper to create manifests for testing.""" + return AgentManifest( + name=name, + version="1.0.0", + input_contract=InputContract( + events_consumed=events_consumed or [], + features_required=features_required or [], + tool_capabilities=tool_capabilities or [], + knowledge_bases=knowledge_bases or [], + ), + output_contract=OutputContract( + events_produced=events_produced or [], + ), + trigger=Trigger(type=trigger_type, condition=trigger_condition), + ) + + +class FakeAgent: + """Minimal agent-like object for testing.""" + def __init__(self, manifest): + self.manifest = manifest + + +# --- Registry Tests --- + + +class TestCompositionRegistry: + def test_register_agent(self): + registry = CompositionRegistry() + manifest = make_manifest("agent_a", events_produced=["EventA"]) + agent = FakeAgent(manifest) + registry.register(agent) + + assert "agent_a" in registry + assert len(registry) == 1 + assert registry.get("agent_a") is agent + + def test_register_manifest_only(self): + registry = CompositionRegistry() + manifest = make_manifest("agent_b", events_consumed=["EventA"]) + registry.register_manifest(manifest) + + assert "agent_b" in registry + assert registry.get("agent_b") is None # no agent instance + assert registry.get_manifest("agent_b") is manifest + + def test_register_without_manifest_raises(self): + registry = CompositionRegistry() + + class NoManifest: + manifest = None + + with pytest.raises(ValueError, match="without a manifest"): + registry.register(NoManifest()) + + def test_unregister(self): + registry = CompositionRegistry() + manifest = make_manifest("agent_a") + registry.register_manifest(manifest) + registry.unregister("agent_a") + + assert "agent_a" not in registry + + def test_producers_of(self): + registry = CompositionRegistry() + registry.register_manifest(make_manifest("a", events_produced=["X"])) + registry.register_manifest(make_manifest("b", events_produced=["X", "Y"])) + registry.register_manifest(make_manifest("c", events_produced=["Y"])) + + assert set(registry.producers_of("X")) == {"a", "b"} + assert set(registry.producers_of("Y")) == {"b", "c"} + assert registry.producers_of("Z") == [] + + def test_consumers_of(self): + registry = CompositionRegistry() + registry.register_manifest(make_manifest("a", events_consumed=["X"])) + registry.register_manifest(make_manifest("b", events_consumed=["X", "Y"])) + + assert set(registry.consumers_of("X")) == {"a", "b"} + assert registry.consumers_of("Y") == ["b"] + + +# --- Resolver Tests --- + + +class TestBindingResolver: + def test_all_satisfied(self): + resolver = BindingResolver( + available_events=["EventA"], + available_features=["feature_x"], + available_tools=["tool_y"], + available_knowledge_bases=["kb_z"], + ) + manifest = make_manifest( + "agent", + events_consumed=["EventA"], + features_required=["feature_x"], + tool_capabilities=["tool_y"], + knowledge_bases=["kb_z"], + ) + + status = resolver.evaluate(manifest) + + assert status.state == "active" + assert status.gaps == [] + assert "EventA" in status.resolved_events + assert "feature_x" in status.resolved_features + + def test_missing_event(self): + resolver = BindingResolver(available_events=[]) + manifest = make_manifest("agent", events_consumed=["MissingEvent"]) + + status = resolver.evaluate(manifest) + + assert status.state == "dormant" + assert len(status.gaps) == 1 + assert status.gaps[0].type == "event" + assert status.gaps[0].name == "MissingEvent" + + def test_missing_feature(self): + resolver = BindingResolver(available_features=[]) + manifest = make_manifest("agent", features_required=["missing_feature"]) + + status = resolver.evaluate(manifest) + + assert status.state == "dormant" + assert status.gaps[0].type == "feature" + + def test_missing_tool(self): + resolver = BindingResolver(available_tools=[]) + manifest = make_manifest("agent", tool_capabilities=["missing_tool"]) + + status = resolver.evaluate(manifest) + + assert status.state == "dormant" + assert status.gaps[0].type == "tool" + + def test_missing_knowledge_base(self): + resolver = BindingResolver(available_knowledge_bases=[]) + manifest = make_manifest("agent", knowledge_bases=["missing_kb"]) + + status = resolver.evaluate(manifest) + + assert status.state == "dormant" + assert status.gaps[0].type == "knowledge_base" + + def test_multiple_gaps(self): + resolver = BindingResolver() + manifest = make_manifest( + "agent", + events_consumed=["E"], + features_required=["F"], + tool_capabilities=["T"], + ) + + status = resolver.evaluate(manifest) + + assert status.state == "dormant" + assert len(status.gaps) == 3 + + def test_add_and_remove_resource(self): + resolver = BindingResolver() + manifest = make_manifest("agent", features_required=["feat"]) + + assert resolver.evaluate(manifest).state == "dormant" + + resolver.add_feature("feat") + assert resolver.evaluate(manifest).state == "active" + + resolver.remove_feature("feat") + assert resolver.evaluate(manifest).state == "dormant" + + def test_evaluate_all(self): + resolver = BindingResolver(available_events=["E1"]) + m1 = make_manifest("a", events_consumed=["E1"]) + m2 = make_manifest("b", events_consumed=["E2"]) + + results = resolver.evaluate_all([m1, m2]) + + assert results[0].state == "active" + assert results[1].state == "dormant" + + def test_what_would_activate(self): + resolver = BindingResolver(available_events=["E1"]) + m1 = make_manifest("a", events_consumed=["E1"]) # already active + m2 = make_manifest("b", events_consumed=["E1"], features_required=["F"]) # dormant + + would = resolver.what_would_activate("feature", "F", [m1, m2]) + + assert would == ["b"] + + def test_gap_report_format(self): + resolver = BindingResolver() + manifest = make_manifest("my_agent", features_required=["x"], tool_capabilities=["y"]) + + status = resolver.evaluate(manifest) + report = status.gap_report() + + assert "my_agent" in report + assert "dormant" in report + assert "feature: x" in report + assert "tool: y" in report + + +# --- Router Tests --- + + +class TestEventRouter: + def test_simple_route(self): + registry = CompositionRegistry() + registry.register_manifest(make_manifest("producer", events_produced=["EventX"])) + registry.register_manifest(make_manifest("consumer", events_consumed=["EventX"])) + + router = EventRouter(registry) + routes = router.resolve_routes() + + assert len(routes) == 1 + assert routes[0].event == "EventX" + assert routes[0].source == "producer" + assert routes[0].targets == ["consumer"] + + def test_fan_out(self): + registry = CompositionRegistry() + registry.register_manifest(make_manifest("producer", events_produced=["EventX"])) + registry.register_manifest(make_manifest("consumer_a", events_consumed=["EventX"])) + registry.register_manifest(make_manifest("consumer_b", events_consumed=["EventX"])) + + router = EventRouter(registry) + routes = router.resolve_routes() + + assert len(routes) == 1 + assert set(routes[0].targets) == {"consumer_a", "consumer_b"} + + def test_chain(self): + registry = CompositionRegistry() + registry.register_manifest(make_manifest("a", events_produced=["E1"])) + registry.register_manifest(make_manifest("b", events_consumed=["E1"], events_produced=["E2"])) + registry.register_manifest(make_manifest("c", events_consumed=["E2"])) + + router = EventRouter(registry) + routes = router.resolve_routes() + + assert len(routes) == 2 + sources = {r.source for r in routes} + assert sources == {"a", "b"} + + def test_no_self_routing(self): + registry = CompositionRegistry() + registry.register_manifest( + make_manifest("agent", events_consumed=["E"], events_produced=["E"]) + ) + + router = EventRouter(registry) + routes = router.resolve_routes() + + assert routes == [] + + def test_dependency_order(self): + registry = CompositionRegistry() + registry.register_manifest(make_manifest("a", events_produced=["E1"])) + registry.register_manifest(make_manifest("b", events_consumed=["E1"], events_produced=["E2"])) + registry.register_manifest(make_manifest("c", events_consumed=["E2"])) + + router = EventRouter(registry) + layers = router.dependency_order() + + assert layers[0] == ["a"] + assert layers[1] == ["b"] + assert layers[2] == ["c"] + + def test_routes_from(self): + registry = CompositionRegistry() + registry.register_manifest(make_manifest("a", events_produced=["E1"])) + registry.register_manifest(make_manifest("b", events_consumed=["E1"])) + + router = EventRouter(registry) + + assert len(router.routes_from("a")) == 1 + assert router.routes_from("b") == [] + + def test_routes_to(self): + registry = CompositionRegistry() + registry.register_manifest(make_manifest("a", events_produced=["E1"])) + registry.register_manifest(make_manifest("b", events_consumed=["E1"])) + + router = EventRouter(registry) + + assert router.routes_to("a") == [] + assert len(router.routes_to("b")) == 1 + + def test_visualize(self): + registry = CompositionRegistry() + registry.register_manifest(make_manifest("a", events_produced=["E1"])) + registry.register_manifest(make_manifest("b", events_consumed=["E1"])) + + router = EventRouter(registry) + viz = router.visualize() + + assert "Event Flow:" in viz + assert "a" in viz + assert "E1" in viz + assert "b" in viz + + def test_empty_registry(self): + registry = CompositionRegistry() + router = EventRouter(registry) + + assert router.resolve_routes() == [] + assert router.visualize() == "(no routes)" + + +# --- Lifecycle Tests --- + + +class TestLifecycleManager: + def test_initial_evaluation(self): + registry = CompositionRegistry() + registry.register_manifest(make_manifest("a", events_consumed=["E1"])) + registry.register_manifest(make_manifest("b")) + + resolver = BindingResolver(available_events=[]) + + manager = LifecycleManager(registry, resolver) + + assert "a" in manager.dormant_agents + assert "b" in manager.active_agents # no dependencies + + def test_add_resource_activates(self): + registry = CompositionRegistry() + registry.register_manifest(make_manifest("agent", features_required=["feat"])) + + resolver = BindingResolver() + manager = LifecycleManager(registry, resolver) + + assert manager.dormant_agents == ["agent"] + + transitions = manager.add_resource("feature", "feat") + + assert len(transitions) == 1 + assert transitions[0].agent_name == "agent" + assert transitions[0].from_state == "dormant" + assert transitions[0].to_state == "active" + assert manager.active_agents == ["agent"] + + def test_remove_resource_deactivates(self): + registry = CompositionRegistry() + registry.register_manifest(make_manifest("agent", features_required=["feat"])) + + resolver = BindingResolver(available_features=["feat"]) + manager = LifecycleManager(registry, resolver) + + assert manager.active_agents == ["agent"] + + transitions = manager.remove_resource("feature", "feat") + + assert transitions[0].to_state == "dormant" + assert manager.dormant_agents == ["agent"] + + def test_history(self): + registry = CompositionRegistry() + registry.register_manifest(make_manifest("agent", features_required=["feat"])) + + resolver = BindingResolver() + manager = LifecycleManager(registry, resolver) + + manager.add_resource("feature", "feat") + manager.remove_resource("feature", "feat") + + assert len(manager.history) == 2 + assert manager.history[0].to_state == "active" + assert manager.history[1].to_state == "dormant" + + def test_get_state(self): + registry = CompositionRegistry() + registry.register_manifest(make_manifest("agent", features_required=["feat"])) + + resolver = BindingResolver() + manager = LifecycleManager(registry, resolver) + + state = manager.get_state("agent") + + assert state.name == "agent" + assert state.state == "dormant" + assert len(state.binding_status.gaps) == 1 + + def test_summary(self): + registry = CompositionRegistry() + registry.register_manifest(make_manifest("active_one")) + registry.register_manifest(make_manifest("dormant_one", features_required=["missing"])) + + resolver = BindingResolver() + manager = LifecycleManager(registry, resolver) + + summary = manager.summary() + + assert "active_one" in summary + assert "dormant_one" in summary + assert "feature:missing" in summary