Skip to content

Commit cd2f2a0

Browse files
authored
feat(strands-memory): add event metadata support to AgentCoreMemorySessionManager (#339)
* feat(strands-memory): add user-supplied event metadata support to AgentCoreMemorySessionManager Allow users to attach custom key-value metadata to conversation events via a new `default_metadata` config field and per-call `metadata` kwarg. Metadata is merged (per-call > config defaults > internal) and validated against reserved keys and the 15-key API limit. Also refactors the internal message buffer from a raw tuple to a `BufferedMessage` NamedTuple for clarity and extensibility. Closes #149 (Phase 1: Metadata) * feat(strands-memory): add metadata_provider for dynamic per-invocation metadata Add `metadata_provider` config field — a callable invoked at each event creation, enabling dynamic metadata like traceId that changes per agent invocation. This solves the Langfuse/user-feedback use case where a static `default_metadata` is insufficient because Strands controls the append_message → create_message call path. Merge precedence: default_metadata < metadata_provider() < per-call kwargs < internal keys. * fix: address PR review feedback - Auto-normalize plain string metadata values to {"stringValue": ...} so users can write {"project": "atlas"} instead of the verbose form. Applied via pydantic validator on default_metadata and at runtime for metadata_provider return values. - Move inline datetime imports to top of test file (nit from Hweinstock) - Fix lint/format issues that caused CI Lint and Format check to fail - Add tests for normalization in both config and session manager * fix: remove unnecessary model_config from AgentCoreMemoryConfig Pydantic v2 handles Callable natively, so arbitrary_types_allowed is not needed. Removing it avoids any risk of breaking subclasses or downstream validators.
1 parent 2f4f297 commit cd2f2a0

6 files changed

Lines changed: 722 additions & 63 deletions

File tree

src/bedrock_agentcore/memory/integrations/strands/README.md

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -219,6 +219,8 @@ result = agent_with_tools("/path/to/image.png")
219219
- `actor_id`: Unique identifier for the user/actor
220220
- `retrieval_config`: Dictionary mapping namespaces to RetrievalConfig objects
221221
- `batch_size`: Number of messages to buffer before sending to AgentCore Memory (1-100, default: 1). A value of 1 sends immediately (no batching).
222+
- `default_metadata`: Optional dictionary of key-value metadata to attach to every message event. Maximum 15 total keys per event (including internal keys). Example: `{"location": {"stringValue": "NYC"}}`
223+
- `metadata_provider`: Optional callable returning a metadata dictionary. Called at each event creation for dynamic values (e.g., traceId). Merged after `default_metadata`.
222224

223225
### RetrievalConfig Parameters
224226

@@ -239,6 +241,71 @@ https://docs.aws.amazon.com/bedrock-agentcore/latest/devguide/memory-strategies.
239241
- `/summaries/{actorId}/{sessionId}/`: Session-specific summaries
240242

241243

244+
---
245+
246+
## Event Metadata
247+
248+
You can attach custom key-value metadata to every message event. This is useful for tagging
249+
conversations with contextual information (e.g., location, project, case type) that can later
250+
be used to filter events with `list_events`.
251+
252+
### Default Metadata (applied to all messages)
253+
254+
```python
255+
config = AgentCoreMemoryConfig(
256+
memory_id=MEM_ID,
257+
session_id=SESSION_ID,
258+
actor_id=ACTOR_ID,
259+
default_metadata={
260+
"project": "atlas",
261+
"env": "production",
262+
},
263+
)
264+
session_manager = AgentCoreMemorySessionManager(config, region_name='us-east-1')
265+
agent = Agent(session_manager=session_manager)
266+
agent("Hello!") # This event will have project=atlas and env=production metadata
267+
```
268+
269+
> Plain strings are auto-wrapped to `{"stringValue": "..."}`. The explicit form
270+
> `{"project": {"stringValue": "atlas"}}` also works.
271+
272+
### Dynamic Metadata (metadata_provider)
273+
274+
For values that change per invocation (e.g., traceId for Langfuse), use `metadata_provider`
275+
a callable invoked at each event creation:
276+
277+
```python
278+
from langfuse.decorators import langfuse_context
279+
280+
def get_trace_metadata():
281+
return {"traceId": langfuse_context.get_current_trace_id() or ""}
282+
283+
config = AgentCoreMemoryConfig(
284+
memory_id=MEM_ID,
285+
session_id=SESSION_ID,
286+
actor_id=ACTOR_ID,
287+
metadata_provider=get_trace_metadata,
288+
)
289+
session_manager = AgentCoreMemorySessionManager(config, region_name='us-east-1')
290+
agent = Agent(session_manager=session_manager)
291+
agent("Hello!") # Event gets the current traceId automatically
292+
```
293+
294+
### Per-call Metadata
295+
296+
You can also pass metadata on individual `create_message` calls. Per-call metadata is merged
297+
with `default_metadata` and `metadata_provider` (per-call values override both for the same key):
298+
299+
```python
300+
session_manager.create_message(
301+
session_id, agent_id, message,
302+
metadata={"priority": "high"},
303+
)
304+
```
305+
306+
> **Note:** The API allows a maximum of 15 metadata key-value pairs per event.
307+
> The keys `stateType` and `agentId` are reserved for internal use.
308+
242309
---
243310

244311
## Message Batching

src/bedrock_agentcore/memory/integrations/strands/config.py

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,13 @@
11
"""Configuration for AgentCore Memory Session Manager."""
22

3-
from typing import Dict, Optional
3+
from typing import Any, Callable, Dict, Optional
44

5-
from pydantic import BaseModel, Field
5+
from pydantic import BaseModel, Field, field_validator
6+
7+
8+
def normalize_metadata(raw: Dict[str, Any]) -> Dict[str, Any]:
9+
"""Normalize metadata values: plain strings become {"stringValue": value}."""
10+
return {k: {"stringValue": v} if isinstance(v, str) else v for k, v in raw.items()}
611

712

813
class RetrievalConfig(BaseModel):
@@ -38,6 +43,14 @@ class AgentCoreMemoryConfig(BaseModel):
3843
Default is "user_context".
3944
filter_restored_tool_context: When True, strip historical toolUse/toolResult blocks from
4045
restored messages before loading them into Strands runtime memory. Default is False.
46+
default_metadata: Optional default metadata key-value pairs to attach to every message event.
47+
Merged with any per-call metadata. Maximum 15 total keys per event (including internal keys).
48+
Accepts plain strings (auto-wrapped) or explicit MetadataValue dicts.
49+
Example: {"location": "NYC"} or {"location": {"stringValue": "NYC"}}
50+
metadata_provider: Optional callable that returns metadata key-value pairs. Called at each
51+
event creation, so it can return dynamic values (e.g. current traceId). The returned
52+
dict is merged after default_metadata but before per-call metadata.
53+
Accepts plain strings (auto-wrapped) or explicit MetadataValue dicts.
4154
"""
4255

4356
memory_id: str = Field(min_length=1)
@@ -48,3 +61,12 @@ class AgentCoreMemoryConfig(BaseModel):
4861
flush_interval_seconds: Optional[float] = Field(default=None, gt=0)
4962
context_tag: str = Field(default="user_context", min_length=1)
5063
filter_restored_tool_context: bool = Field(default=False)
64+
default_metadata: Optional[Dict[str, Any]] = None
65+
metadata_provider: Optional[Callable[[], Dict[str, Any]]] = None
66+
67+
@field_validator("default_metadata", mode="before")
68+
@classmethod
69+
def _normalize_default_metadata(cls, v: Optional[Dict[str, Any]]) -> Optional[Dict[str, Any]]:
70+
if v is None:
71+
return None
72+
return normalize_metadata(v)

src/bedrock_agentcore/memory/integrations/strands/session_manager.py

Lines changed: 119 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
from concurrent.futures import ThreadPoolExecutor, as_completed
77
from datetime import datetime, timedelta, timezone
88
from enum import Enum
9-
from typing import TYPE_CHECKING, Any, Optional
9+
from typing import TYPE_CHECKING, Any, Dict, NamedTuple, Optional
1010

1111
import boto3
1212
from botocore.config import Config as BotocoreConfig
@@ -23,12 +23,13 @@
2323
from bedrock_agentcore.memory.models.filters import (
2424
EventMetadataFilter,
2525
LeftExpression,
26+
MetadataValue,
2627
OperatorType,
2728
RightExpression,
2829
)
2930

3031
from .bedrock_converter import AgentCoreMemoryConverter
31-
from .config import AgentCoreMemoryConfig, RetrievalConfig
32+
from .config import AgentCoreMemoryConfig, RetrievalConfig, normalize_metadata
3233
from .converters import MemoryConverter
3334

3435
if TYPE_CHECKING:
@@ -46,6 +47,22 @@
4647
STATE_TYPE_KEY = "stateType"
4748
AGENT_ID_KEY = "agentId"
4849

50+
# Maximum metadata key-value pairs per event (API limit)
51+
MAX_METADATA_KEYS = 15
52+
53+
# Reserved internal metadata keys that users cannot override
54+
RESERVED_METADATA_KEYS = frozenset({STATE_TYPE_KEY, AGENT_ID_KEY})
55+
56+
57+
class BufferedMessage(NamedTuple):
58+
"""A pre-processed message waiting to be flushed to AgentCore Memory."""
59+
60+
session_id: str
61+
messages: list[tuple[str, str]]
62+
is_blob: bool
63+
timestamp: datetime
64+
metadata: Optional[Dict[str, MetadataValue]] = None
65+
4966

5067
class StateType(Enum):
5168
"""State type for distinguishing session and agent metadata in events."""
@@ -129,8 +146,8 @@ def __init__(
129146
session = boto_session or boto3.Session(region_name=region_name)
130147
self.has_existing_agent = False
131148

132-
# Batching support - stores pre-processed messages: (session_id, messages, is_blob, timestamp)
133-
self._message_buffer: list[tuple[str, list[tuple[str, str]], bool, datetime]] = []
149+
# Batching support - stores pre-processed messages
150+
self._message_buffer: list[BufferedMessage] = []
134151
self._message_lock = threading.Lock()
135152

136153
# Agent state buffering - stores all agent state updates: (session_id, agent)
@@ -169,6 +186,55 @@ def __init__(
169186
if self.config.flush_interval_seconds:
170187
self._start_flush_timer()
171188

189+
def _build_metadata(
190+
self,
191+
internal_metadata: Optional[Dict[str, MetadataValue]] = None,
192+
per_call_metadata: Optional[Dict[str, MetadataValue]] = None,
193+
) -> Optional[Dict[str, MetadataValue]]:
194+
"""Build merged metadata from config defaults, provider, per-call overrides, and internal keys.
195+
196+
Merge precedence (highest wins):
197+
1. internal_metadata (stateType, agentId) — always wins
198+
2. per_call_metadata (passed via **kwargs)
199+
3. metadata_provider() (called at event creation time for dynamic values)
200+
4. self.config.default_metadata (set at config construction time)
201+
202+
Args:
203+
internal_metadata: System-reserved metadata (e.g. stateType, agentId).
204+
per_call_metadata: Caller-supplied metadata for a single operation.
205+
206+
Returns:
207+
Merged metadata dict, or None if empty.
208+
209+
Raises:
210+
ValueError: If user metadata contains reserved keys or total keys exceed MAX_METADATA_KEYS.
211+
"""
212+
merged: Dict[str, MetadataValue] = {}
213+
214+
if self.config.default_metadata:
215+
merged.update(self.config.default_metadata)
216+
217+
if self.config.metadata_provider:
218+
merged.update(normalize_metadata(self.config.metadata_provider()))
219+
220+
if per_call_metadata:
221+
merged.update(per_call_metadata)
222+
223+
# Validate user-supplied keys before merging internal keys
224+
user_reserved = RESERVED_METADATA_KEYS & merged.keys()
225+
if user_reserved:
226+
raise ValueError(
227+
f"Metadata keys {user_reserved} are reserved for internal use. Reserved keys: {RESERVED_METADATA_KEYS}"
228+
)
229+
230+
if internal_metadata:
231+
merged.update(internal_metadata)
232+
233+
if len(merged) > MAX_METADATA_KEYS:
234+
raise ValueError(f"Combined metadata has {len(merged)} keys, exceeding the maximum of {MAX_METADATA_KEYS}.")
235+
236+
return merged or None
237+
172238
# region SessionRepository interface implementation
173239
def create_session(self, session: Session, **kwargs: Any) -> Session:
174240
"""Create a new session in AgentCore Memory.
@@ -482,6 +548,9 @@ def create_message(
482548

483549
is_blob = self.converter.exceeds_conversational_limit(messages[0])
484550

551+
# Build merged metadata from config defaults + per-call overrides
552+
merged_metadata = self._build_metadata(per_call_metadata=kwargs.get("metadata"))
553+
485554
# Parse the original timestamp and use it as desired timestamp
486555
original_timestamp = datetime.fromisoformat(session_message.created_at.replace("Z", "+00:00"))
487556
monotonic_timestamp = self._get_monotonic_timestamp(original_timestamp)
@@ -490,7 +559,15 @@ def create_message(
490559
# Buffer the pre-processed message
491560
should_flush = False
492561
with self._message_lock:
493-
self._message_buffer.append((session_id, messages, is_blob, monotonic_timestamp))
562+
self._message_buffer.append(
563+
BufferedMessage(
564+
session_id=session_id,
565+
messages=messages,
566+
is_blob=is_blob,
567+
timestamp=monotonic_timestamp,
568+
metadata=merged_metadata,
569+
)
570+
)
494571
should_flush = len(self._message_buffer) >= self.config.batch_size
495572

496573
# Flush only messages outside the lock to prevent deadlock
@@ -508,17 +585,19 @@ def create_message(
508585
session_id=session_id,
509586
messages=messages,
510587
event_timestamp=monotonic_timestamp,
588+
metadata=merged_metadata,
511589
)
512590
else:
513-
event = self.memory_client.gmdp_client.create_event(
514-
memoryId=self.config.memory_id,
515-
actorId=self.config.actor_id,
516-
sessionId=session_id,
517-
payload=[
518-
{"blob": json.dumps(messages[0])},
519-
],
520-
eventTimestamp=monotonic_timestamp,
521-
)
591+
create_event_kwargs: dict[str, Any] = {
592+
"memoryId": self.config.memory_id,
593+
"actorId": self.config.actor_id,
594+
"sessionId": session_id,
595+
"payload": [{"blob": json.dumps(messages[0])}],
596+
"eventTimestamp": monotonic_timestamp,
597+
}
598+
if merged_metadata:
599+
create_event_kwargs["metadata"] = merged_metadata
600+
event = self.memory_client.gmdp_client.create_event(**create_event_kwargs)
522601
logger.debug("Created event: %s for message: %s", event.get("eventId"), session_message.message_id)
523602
return event
524603
except Exception as e:
@@ -790,39 +869,45 @@ def _flush_messages_only(self) -> list[dict[str, Any]]:
790869
return []
791870

792871
# Group all messages by session_id, combining conversational and blob messages
793-
# Structure: {session_id: {"payload": [...], "timestamp": latest_timestamp}}
872+
# Structure: {session_id: {"payload": [...], "timestamp": latest_timestamp, "metadata": {...}}}
794873
session_groups: dict[str, dict[str, Any]] = {}
795874

796-
for session_id, messages, is_blob, monotonic_timestamp in messages_to_send:
797-
if session_id not in session_groups:
798-
session_groups[session_id] = {"payload": [], "timestamp": monotonic_timestamp}
875+
for buffered_msg in messages_to_send:
876+
sid = buffered_msg.session_id
877+
if sid not in session_groups:
878+
session_groups[sid] = {"payload": [], "timestamp": buffered_msg.timestamp, "metadata": {}}
799879

800-
if is_blob:
801-
# Add blob messages to payload
802-
for msg in messages:
803-
session_groups[session_id]["payload"].append({"blob": json.dumps(msg)})
880+
if buffered_msg.is_blob:
881+
for msg in buffered_msg.messages:
882+
session_groups[sid]["payload"].append({"blob": json.dumps(msg)})
804883
else:
805-
# Add conversational messages to payload
806-
for text, role in messages:
807-
session_groups[session_id]["payload"].append(
884+
for text, role in buffered_msg.messages:
885+
session_groups[sid]["payload"].append(
808886
{"conversational": {"content": {"text": text}, "role": role.upper()}}
809887
)
810888

811889
# Use the latest timestamp for the combined event
812-
if monotonic_timestamp > session_groups[session_id]["timestamp"]:
813-
session_groups[session_id]["timestamp"] = monotonic_timestamp
890+
if buffered_msg.timestamp > session_groups[sid]["timestamp"]:
891+
session_groups[sid]["timestamp"] = buffered_msg.timestamp
892+
893+
# Merge metadata (later entries override earlier for same key)
894+
if buffered_msg.metadata:
895+
session_groups[sid]["metadata"].update(buffered_msg.metadata)
814896

815897
results = []
816898
try:
817899
# Send one create_event per session_id with all messages (conversational + blob)
818900
for session_id, group in session_groups.items():
819-
event = self.memory_client.gmdp_client.create_event(
820-
memoryId=self.config.memory_id,
821-
actorId=self.config.actor_id,
822-
sessionId=session_id,
823-
payload=group["payload"],
824-
eventTimestamp=group["timestamp"],
825-
)
901+
create_event_kwargs: dict[str, Any] = {
902+
"memoryId": self.config.memory_id,
903+
"actorId": self.config.actor_id,
904+
"sessionId": session_id,
905+
"payload": group["payload"],
906+
"eventTimestamp": group["timestamp"],
907+
}
908+
if group["metadata"]:
909+
create_event_kwargs["metadata"] = group["metadata"]
910+
event = self.memory_client.gmdp_client.create_event(**create_event_kwargs)
826911
results.append(event)
827912
logger.debug(
828913
"Flushed batched event for session %s with %d messages: %s",

0 commit comments

Comments
 (0)