Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions src/bedrock_agentcore/_utils/pagination.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
"""Reusable pagination utility for fetching and converting paginated results."""

from typing import Any, Callable, TypeVar

T = TypeVar("T")
R = TypeVar("R")

DEFAULT_PAGE_SIZE = 100


def paginate_for_n_results(
fetch_page: Callable[[dict[str, Any]], tuple[list[R], str | None]],
initial_params: dict[str, Any],
converter: Callable[[list[R]], list[T]],
target_count: int,
) -> list[T]:
"""Paginate an arbitrary API, converting accumulated results after each page.

The full accumulated list is re-converted after each page rather than converting
per-page, because some converters (e.g. events_to_messages) iterate the input in
reverse — converting per-page would produce incorrect ordering.

Args:
fetch_page: Takes params dict, returns (items, next_token). next_token is None when exhausted.
initial_params: Base params for the first call. "nextToken" is added for subsequent pages.
converter: Converts accumulated raw items to desired output type.
target_count: Stop after collecting this many converted items.
"""
all_items: list[R] = []
next_token: str | None = None

while True:
params = {**initial_params}
if next_token is not None:
params["nextToken"] = next_token

items, next_token = fetch_page(params)
all_items.extend(items)

converted = converter(all_items)
if len(converted) >= target_count or next_token is None:
return converted[:target_count]
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from strands.types.session import Session, SessionAgent, SessionMessage
from typing_extensions import override

from bedrock_agentcore._utils.pagination import DEFAULT_PAGE_SIZE, paginate_for_n_results
from bedrock_agentcore.memory.client import MemoryClient
from bedrock_agentcore.memory.models.filters import (
EventMetadataFilter,
Expand Down Expand Up @@ -596,15 +597,24 @@ def list_messages(
raise SessionException(f"Session ID mismatch: expected {self.config.session_id}, got {session_id}")

try:
max_results = (limit + offset) if limit else MAX_FETCH_ALL_RESULTS

events = self.memory_client.list_events(
memory_id=self.config.memory_id,
actor_id=self.config.actor_id,
session_id=session_id,
max_results=max_results,
target = (limit + offset) if limit else MAX_FETCH_ALL_RESULTS

def fetch_page(params: dict) -> tuple[list, str | None]:
response = self.memory_client.gmdp_client.list_events(**params)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This bypasses MemoryClient.list_events and calls gmdp_client.list_events directly. MemoryClient.list_events wraps the raw API with error handling (catches ClientError), parameter normalization, and filter construction — none of that applies here. It also means the session manager now has two different data-plane access patterns: every other method goes through MemoryClient, but list_messages reaches through to the internal client. If someone later adds logging or error normalization to MemoryClient.list_events, this code path silently misses it.

Recommendation: Add a paginated variant to MemoryClient — e.g., list_events_page(memory_id, actor_id, session_id, max_results, next_token) -> (events, next_token) — that wraps the raw API call with the same error handling. Then fetch_page calls that instead of reaching through to gmdp_client.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The challenge here is that the list_events wrapper in MemoryClient already paginates, but does so in a way that is incompatible with this fix (still counts agent state towards max results).

I thought about trying to generalize the existing MemoryClient method, but doing so in a backwards-compatible way is going to be difficult.

For those reasons, I chose to drop down to the raw client to control the pagination myself.

return response.get("events", []), response.get("nextToken")

messages = paginate_for_n_results(
fetch_page=fetch_page,
initial_params={
"memoryId": self.config.memory_id,
"actorId": self.config.actor_id,
"sessionId": session_id,
"maxResults": DEFAULT_PAGE_SIZE,
"includePayloads": True,
},
converter=self.converter.events_to_messages,
target_count=target,
)
messages = self.converter.events_to_messages(events)
if self.config.filter_restored_tool_context:
messages = self._filter_restored_tool_context(messages)
if limit is not None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -302,36 +302,22 @@ def test_create_message(self, session_manager, mock_memory_client):

def test_list_messages(self, session_manager, mock_memory_client):
"""Test listing messages."""
mock_memory_client.list_events.return_value = [
{
"eventId": "event-1",
"eventTimestamp": "2024-01-01T12:00:00Z",
"payload": [
{
"conversational": {
"content": {
"text": '{"message": {"role": "user", "content": [{"text": "Hello"}]}, "message_id": 1}'
},
"role": "USER",
}
}
],
},
{
"eventId": "event-2",
"eventTimestamp": "2024-01-01T12:00:00Z",
"payload": [
{
"conversational": {
"content": {
"text": '{"message": {"role": "assistant", "content": [{"text": "Hi there"}]}, "message_id": 2}' # noqa E501
},
"role": "ASSISTANT",
}
}
],
},
]
user_text = '{"message": {"role": "user", "content": [{"text": "Hello"}]}, "message_id": 1}'
asst_text = '{"message": {"role": "assistant", "content": [{"text": "Hi there"}]}, "message_id": 2}'
session_manager.memory_client.gmdp_client.list_events.return_value = {
"events": [
{
"eventId": "event-1",
"eventTimestamp": "2024-01-01T12:00:00Z",
"payload": [{"conversational": {"content": {"text": user_text}, "role": "USER"}}],
},
{
"eventId": "event-2",
"eventTimestamp": "2024-01-01T12:00:00Z",
"payload": [{"conversational": {"content": {"text": asst_text}, "role": "ASSISTANT"}}],
},
]
}

messages = session_manager.list_messages("test-session-456", "test-agent-123")

Expand All @@ -341,36 +327,22 @@ def test_list_messages(self, session_manager, mock_memory_client):

def test_list_messages_returns_values_in_correct_reverse_order(self, session_manager, mock_memory_client):
"""Test listing messages."""
mock_memory_client.list_events.return_value = [
{
"eventId": "event-1",
"eventTimestamp": "2024-01-01T12:00:00Z",
"payload": [
{
"conversational": {
"content": {
"text": '{"message": {"role": "user", "content": [{"text": "Hello"}]}, "message_id": 1}'
},
"role": "USER",
}
}
],
},
{
"eventId": "event-2",
"eventTimestamp": "2024-01-01T12:00:00Z",
"payload": [
{
"conversational": {
"content": {
"text": '{"message": {"role": "assistant", "content": [{"text": "Hi there"}]}, "message_id": 2}' # noqa E501
},
"role": "ASSISTANT",
}
}
],
},
]
user_text = '{"message": {"role": "user", "content": [{"text": "Hello"}]}, "message_id": 1}'
asst_text = '{"message": {"role": "assistant", "content": [{"text": "Hi there"}]}, "message_id": 2}'
session_manager.memory_client.gmdp_client.list_events.return_value = {
"events": [
{
"eventId": "event-1",
"eventTimestamp": "2024-01-01T12:00:00Z",
"payload": [{"conversational": {"content": {"text": user_text}, "role": "USER"}}],
},
{
"eventId": "event-2",
"eventTimestamp": "2024-01-01T12:00:00Z",
"payload": [{"conversational": {"content": {"text": asst_text}, "role": "ASSISTANT"}}],
},
]
}

messages = session_manager.list_messages("test-session-456", "test-agent-123")

Expand Down Expand Up @@ -508,37 +480,39 @@ def test_update_message_wrong_session(self, session_manager):

def test_list_messages_with_limit(self, session_manager, mock_memory_client):
"""Test listing messages with limit."""
mock_memory_client.list_events.return_value = [
{
"eventId": "event-1",
"eventTimestamp": "2024-01-01T12:00:00Z",
"payload": [
{
"conversational": {
"content": {
"text": '{"message": {"role": "user", '
'"content": [{"text": "Message 1"}]}, "message_id": 1}'
},
"role": "USER",
session_manager.memory_client.gmdp_client.list_events.return_value = {
"events": [
{
"eventId": "event-1",
"eventTimestamp": "2024-01-01T12:00:00Z",
"payload": [
{
"conversational": {
"content": {
"text": '{"message": {"role": "user", '
'"content": [{"text": "Message 1"}]}, "message_id": 1}'
},
"role": "USER",
}
}
}
],
},
{
"eventId": "event-2",
"eventTimestamp": "2024-01-01T12:00:00Z",
"payload": [
{
"conversational": {
"content": {
"text": '{"message": {"role": "assistant", "content": [{"text": "Message 2"}]}, "message_id": 2}' # noqa E501
},
"role": "ASSISTANT",
],
},
{
"eventId": "event-2",
"eventTimestamp": "2024-01-01T12:00:00Z",
"payload": [
{
"conversational": {
"content": {
"text": '{"message": {"role": "assistant", "content": [{"text": "Message 2"}]}, "message_id": 2}' # noqa E501
},
"role": "ASSISTANT",
}
}
}
],
},
]
],
},
]
}

messages = session_manager.list_messages("test-session-456", "test-agent-123", limit=1, offset=1)

Expand All @@ -552,7 +526,7 @@ def test_list_messages_wrong_session(self, session_manager):

def test_list_messages_exception(self, session_manager, mock_memory_client):
"""Test listing messages when exception occurs."""
mock_memory_client.list_events.side_effect = Exception("API Error")
session_manager.memory_client.gmdp_client.list_events.side_effect = Exception("API Error")

messages = session_manager.list_messages("test-session-456", "test-agent-123")

Expand Down Expand Up @@ -1189,25 +1163,90 @@ def test_retrieve_customer_context_filters_by_relevance_score(self, mock_memory_
assert "Low relevance 1" not in injected_context
assert "Low relevance 2" not in injected_context

def test_list_messages_default_max_results(self, session_manager, mock_memory_client):
"""Test listing messages without limit uses default max_results=10000."""
mock_memory_client.list_events.return_value = []
def test_list_messages_with_limit_skips_state_events(self, session_manager, mock_memory_client):
"""list_messages with limit returns exactly limit messages even when state events are mixed in.

session_manager.list_messages("test-session-456", "test-agent-123")
State events (session/agent blobs) share the same actorId as conversational events
after the metadata-based identification change. If list_messages counts raw events
toward the limit, state events consume slots and the caller gets fewer messages
than requested.
"""

mock_memory_client.list_events.assert_called_once()
call_kwargs = mock_memory_client.list_events.call_args[1]
assert call_kwargs["max_results"] == 10000
def _conv_event(eid, text, role):
return {
"eventId": eid,
"payload": [
{
"conversational": {
"content": {
"text": f'{{"message": {{"role": "{role}", '
f'"content": [{{"text": "{text}"}}]}}, "message_id": {eid}}}'
},
"role": role.upper(),
}
}
],
}

def test_list_messages_with_limit_calculates_max_results(self, session_manager, mock_memory_client):
"""Test listing messages with limit calculates max_results correctly."""
mock_memory_client.list_events.return_value = []
def _state_event(eid):
return {
"eventId": eid,
"payload": [{"blob": '{"session_id": "s", "session_type": "AGENT"}'}],
"metadata": {"stateType": {"stringValue": "SESSION"}},
}

# Page 1: 2 state + 3 conversational (5 raw events, only 3 convert to messages)
# Page 2: 3 more conversational
page1 = [
_state_event("s1"),
_conv_event(1, "Hello", "user"),
_conv_event(2, "Hi", "assistant"),
_state_event("s2"),
_conv_event(3, "How are you?", "user"),
]
page2 = [
_conv_event(4, "Good", "assistant"),
_conv_event(5, "Great", "user"),
_conv_event(6, "Thanks", "assistant"),
]

mock_gmdp = session_manager.memory_client.gmdp_client
mock_gmdp.list_events.side_effect = [
{"events": page1, "nextToken": "tok"},
{"events": page2},
]

messages = session_manager.list_messages("test-session-456", "test-agent-123", limit=5)

assert len(messages) == 5

session_manager.list_messages("test-session-456", "test-agent-123", limit=500, offset=50)
def test_list_messages_with_limit_returns_fewer_when_not_enough(self, session_manager, mock_memory_client):
"""list_messages returns all available messages when fewer than limit exist."""

mock_memory_client.list_events.assert_called_once()
call_kwargs = mock_memory_client.list_events.call_args[1]
assert call_kwargs["max_results"] == 550 # limit + offset
def _conv_event(eid, text, role):
return {
"eventId": eid,
"payload": [
{
"conversational": {
"content": {
"text": f'{{"message": {{"role": "{role}", '
f'"content": [{{"text": "{text}"}}]}}, "message_id": {eid}}}'
},
"role": role.upper(),
}
}
],
}

mock_gmdp = session_manager.memory_client.gmdp_client
mock_gmdp.list_events.return_value = {
"events": [_conv_event(1, "Hello", "user"), _conv_event(2, "Hi", "assistant")]
}

messages = session_manager.list_messages("test-session-456", "test-agent-123", limit=10)

assert len(messages) == 2

def test_append_message_handles_none_from_create_message(self, session_manager, test_agent):
"""Test that append_message gracefully handles None return from create_message."""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ def test_list_messages_filters_restored_tool_context():
manager.session_id = config.session_id
manager.session = Session(session_id=config.session_id, session_type=SessionType.AGENT)

manager.memory_client.gmdp_client.list_events.return_value = {"events": [{"payload": []}]}
manager.converter = Mock()
manager.converter.events_to_messages.return_value = [
SessionMessage(message={"role": "user", "content": [{"text": "hello"}]}, message_id=0),
Expand Down
Loading
Loading