From 9afab4f06562cb46fc30c0c4a5312699cf5a042c Mon Sep 17 00:00:00 2001 From: Harrison Weinstock Date: Mon, 16 Mar 2026 16:56:49 +0000 Subject: [PATCH] fix(memory): paginate list_messages by converted message count, not raw event count --- src/bedrock_agentcore/_utils/pagination.py | 42 +++ .../integrations/strands/session_manager.py | 26 +- .../test_agentcore_memory_session_manager.py | 247 ++++++++++-------- ...memory_session_manager_openai_converter.py | 1 + .../integrations/test_session_manager.py | 26 ++ 5 files changed, 230 insertions(+), 112 deletions(-) create mode 100644 src/bedrock_agentcore/_utils/pagination.py diff --git a/src/bedrock_agentcore/_utils/pagination.py b/src/bedrock_agentcore/_utils/pagination.py new file mode 100644 index 00000000..7d139b8e --- /dev/null +++ b/src/bedrock_agentcore/_utils/pagination.py @@ -0,0 +1,42 @@ +"""Reusable pagination utility for fetching and converting paginated results.""" + +from typing import Any, Callable, TypeVar + +T = TypeVar("T") +R = TypeVar("R") + +DEFAULT_PAGE_SIZE = 100 + + +def paginate_for_n_results( + fetch_page: Callable[[dict[str, Any]], tuple[list[R], str | None]], + initial_params: dict[str, Any], + converter: Callable[[list[R]], list[T]], + target_count: int, +) -> list[T]: + """Paginate an arbitrary API, converting accumulated results after each page. + + The full accumulated list is re-converted after each page rather than converting + per-page, because some converters (e.g. events_to_messages) iterate the input in + reverse — converting per-page would produce incorrect ordering. + + Args: + fetch_page: Takes params dict, returns (items, next_token). next_token is None when exhausted. + initial_params: Base params for the first call. "nextToken" is added for subsequent pages. + converter: Converts accumulated raw items to desired output type. + target_count: Stop after collecting this many converted items. + """ + all_items: list[R] = [] + next_token: str | None = None + + while True: + params = {**initial_params} + if next_token is not None: + params["nextToken"] = next_token + + items, next_token = fetch_page(params) + all_items.extend(items) + + converted = converter(all_items) + if len(converted) >= target_count or next_token is None: + return converted[:target_count] diff --git a/src/bedrock_agentcore/memory/integrations/strands/session_manager.py b/src/bedrock_agentcore/memory/integrations/strands/session_manager.py index 68bc05b9..3a3cd042 100644 --- a/src/bedrock_agentcore/memory/integrations/strands/session_manager.py +++ b/src/bedrock_agentcore/memory/integrations/strands/session_manager.py @@ -19,6 +19,7 @@ from strands.types.session import Session, SessionAgent, SessionMessage from typing_extensions import override +from bedrock_agentcore._utils.pagination import DEFAULT_PAGE_SIZE, paginate_for_n_results from bedrock_agentcore.memory.client import MemoryClient from bedrock_agentcore.memory.models.filters import ( EventMetadataFilter, @@ -596,15 +597,24 @@ def list_messages( raise SessionException(f"Session ID mismatch: expected {self.config.session_id}, got {session_id}") try: - max_results = (limit + offset) if limit else MAX_FETCH_ALL_RESULTS - - events = self.memory_client.list_events( - memory_id=self.config.memory_id, - actor_id=self.config.actor_id, - session_id=session_id, - max_results=max_results, + target = (limit + offset) if limit else MAX_FETCH_ALL_RESULTS + + def fetch_page(params: dict) -> tuple[list, str | None]: + response = self.memory_client.gmdp_client.list_events(**params) + return response.get("events", []), response.get("nextToken") + + messages = paginate_for_n_results( + fetch_page=fetch_page, + initial_params={ + "memoryId": self.config.memory_id, + "actorId": self.config.actor_id, + "sessionId": session_id, + "maxResults": DEFAULT_PAGE_SIZE, + "includePayloads": True, + }, + converter=self.converter.events_to_messages, + target_count=target, ) - messages = self.converter.events_to_messages(events) if self.config.filter_restored_tool_context: messages = self._filter_restored_tool_context(messages) if limit is not None: diff --git a/tests/bedrock_agentcore/memory/integrations/strands/test_agentcore_memory_session_manager.py b/tests/bedrock_agentcore/memory/integrations/strands/test_agentcore_memory_session_manager.py index 3d872a6e..80736e79 100644 --- a/tests/bedrock_agentcore/memory/integrations/strands/test_agentcore_memory_session_manager.py +++ b/tests/bedrock_agentcore/memory/integrations/strands/test_agentcore_memory_session_manager.py @@ -302,36 +302,22 @@ def test_create_message(self, session_manager, mock_memory_client): def test_list_messages(self, session_manager, mock_memory_client): """Test listing messages.""" - mock_memory_client.list_events.return_value = [ - { - "eventId": "event-1", - "eventTimestamp": "2024-01-01T12:00:00Z", - "payload": [ - { - "conversational": { - "content": { - "text": '{"message": {"role": "user", "content": [{"text": "Hello"}]}, "message_id": 1}' - }, - "role": "USER", - } - } - ], - }, - { - "eventId": "event-2", - "eventTimestamp": "2024-01-01T12:00:00Z", - "payload": [ - { - "conversational": { - "content": { - "text": '{"message": {"role": "assistant", "content": [{"text": "Hi there"}]}, "message_id": 2}' # noqa E501 - }, - "role": "ASSISTANT", - } - } - ], - }, - ] + user_text = '{"message": {"role": "user", "content": [{"text": "Hello"}]}, "message_id": 1}' + asst_text = '{"message": {"role": "assistant", "content": [{"text": "Hi there"}]}, "message_id": 2}' + session_manager.memory_client.gmdp_client.list_events.return_value = { + "events": [ + { + "eventId": "event-1", + "eventTimestamp": "2024-01-01T12:00:00Z", + "payload": [{"conversational": {"content": {"text": user_text}, "role": "USER"}}], + }, + { + "eventId": "event-2", + "eventTimestamp": "2024-01-01T12:00:00Z", + "payload": [{"conversational": {"content": {"text": asst_text}, "role": "ASSISTANT"}}], + }, + ] + } messages = session_manager.list_messages("test-session-456", "test-agent-123") @@ -341,36 +327,22 @@ def test_list_messages(self, session_manager, mock_memory_client): def test_list_messages_returns_values_in_correct_reverse_order(self, session_manager, mock_memory_client): """Test listing messages.""" - mock_memory_client.list_events.return_value = [ - { - "eventId": "event-1", - "eventTimestamp": "2024-01-01T12:00:00Z", - "payload": [ - { - "conversational": { - "content": { - "text": '{"message": {"role": "user", "content": [{"text": "Hello"}]}, "message_id": 1}' - }, - "role": "USER", - } - } - ], - }, - { - "eventId": "event-2", - "eventTimestamp": "2024-01-01T12:00:00Z", - "payload": [ - { - "conversational": { - "content": { - "text": '{"message": {"role": "assistant", "content": [{"text": "Hi there"}]}, "message_id": 2}' # noqa E501 - }, - "role": "ASSISTANT", - } - } - ], - }, - ] + user_text = '{"message": {"role": "user", "content": [{"text": "Hello"}]}, "message_id": 1}' + asst_text = '{"message": {"role": "assistant", "content": [{"text": "Hi there"}]}, "message_id": 2}' + session_manager.memory_client.gmdp_client.list_events.return_value = { + "events": [ + { + "eventId": "event-1", + "eventTimestamp": "2024-01-01T12:00:00Z", + "payload": [{"conversational": {"content": {"text": user_text}, "role": "USER"}}], + }, + { + "eventId": "event-2", + "eventTimestamp": "2024-01-01T12:00:00Z", + "payload": [{"conversational": {"content": {"text": asst_text}, "role": "ASSISTANT"}}], + }, + ] + } messages = session_manager.list_messages("test-session-456", "test-agent-123") @@ -508,37 +480,39 @@ def test_update_message_wrong_session(self, session_manager): def test_list_messages_with_limit(self, session_manager, mock_memory_client): """Test listing messages with limit.""" - mock_memory_client.list_events.return_value = [ - { - "eventId": "event-1", - "eventTimestamp": "2024-01-01T12:00:00Z", - "payload": [ - { - "conversational": { - "content": { - "text": '{"message": {"role": "user", ' - '"content": [{"text": "Message 1"}]}, "message_id": 1}' - }, - "role": "USER", + session_manager.memory_client.gmdp_client.list_events.return_value = { + "events": [ + { + "eventId": "event-1", + "eventTimestamp": "2024-01-01T12:00:00Z", + "payload": [ + { + "conversational": { + "content": { + "text": '{"message": {"role": "user", ' + '"content": [{"text": "Message 1"}]}, "message_id": 1}' + }, + "role": "USER", + } } - } - ], - }, - { - "eventId": "event-2", - "eventTimestamp": "2024-01-01T12:00:00Z", - "payload": [ - { - "conversational": { - "content": { - "text": '{"message": {"role": "assistant", "content": [{"text": "Message 2"}]}, "message_id": 2}' # noqa E501 - }, - "role": "ASSISTANT", + ], + }, + { + "eventId": "event-2", + "eventTimestamp": "2024-01-01T12:00:00Z", + "payload": [ + { + "conversational": { + "content": { + "text": '{"message": {"role": "assistant", "content": [{"text": "Message 2"}]}, "message_id": 2}' # noqa E501 + }, + "role": "ASSISTANT", + } } - } - ], - }, - ] + ], + }, + ] + } messages = session_manager.list_messages("test-session-456", "test-agent-123", limit=1, offset=1) @@ -552,7 +526,7 @@ def test_list_messages_wrong_session(self, session_manager): def test_list_messages_exception(self, session_manager, mock_memory_client): """Test listing messages when exception occurs.""" - mock_memory_client.list_events.side_effect = Exception("API Error") + session_manager.memory_client.gmdp_client.list_events.side_effect = Exception("API Error") messages = session_manager.list_messages("test-session-456", "test-agent-123") @@ -1189,25 +1163,90 @@ def test_retrieve_customer_context_filters_by_relevance_score(self, mock_memory_ assert "Low relevance 1" not in injected_context assert "Low relevance 2" not in injected_context - def test_list_messages_default_max_results(self, session_manager, mock_memory_client): - """Test listing messages without limit uses default max_results=10000.""" - mock_memory_client.list_events.return_value = [] + def test_list_messages_with_limit_skips_state_events(self, session_manager, mock_memory_client): + """list_messages with limit returns exactly limit messages even when state events are mixed in. - session_manager.list_messages("test-session-456", "test-agent-123") + State events (session/agent blobs) share the same actorId as conversational events + after the metadata-based identification change. If list_messages counts raw events + toward the limit, state events consume slots and the caller gets fewer messages + than requested. + """ - mock_memory_client.list_events.assert_called_once() - call_kwargs = mock_memory_client.list_events.call_args[1] - assert call_kwargs["max_results"] == 10000 + def _conv_event(eid, text, role): + return { + "eventId": eid, + "payload": [ + { + "conversational": { + "content": { + "text": f'{{"message": {{"role": "{role}", ' + f'"content": [{{"text": "{text}"}}]}}, "message_id": {eid}}}' + }, + "role": role.upper(), + } + } + ], + } - def test_list_messages_with_limit_calculates_max_results(self, session_manager, mock_memory_client): - """Test listing messages with limit calculates max_results correctly.""" - mock_memory_client.list_events.return_value = [] + def _state_event(eid): + return { + "eventId": eid, + "payload": [{"blob": '{"session_id": "s", "session_type": "AGENT"}'}], + "metadata": {"stateType": {"stringValue": "SESSION"}}, + } + + # Page 1: 2 state + 3 conversational (5 raw events, only 3 convert to messages) + # Page 2: 3 more conversational + page1 = [ + _state_event("s1"), + _conv_event(1, "Hello", "user"), + _conv_event(2, "Hi", "assistant"), + _state_event("s2"), + _conv_event(3, "How are you?", "user"), + ] + page2 = [ + _conv_event(4, "Good", "assistant"), + _conv_event(5, "Great", "user"), + _conv_event(6, "Thanks", "assistant"), + ] + + mock_gmdp = session_manager.memory_client.gmdp_client + mock_gmdp.list_events.side_effect = [ + {"events": page1, "nextToken": "tok"}, + {"events": page2}, + ] + + messages = session_manager.list_messages("test-session-456", "test-agent-123", limit=5) + + assert len(messages) == 5 - session_manager.list_messages("test-session-456", "test-agent-123", limit=500, offset=50) + def test_list_messages_with_limit_returns_fewer_when_not_enough(self, session_manager, mock_memory_client): + """list_messages returns all available messages when fewer than limit exist.""" - mock_memory_client.list_events.assert_called_once() - call_kwargs = mock_memory_client.list_events.call_args[1] - assert call_kwargs["max_results"] == 550 # limit + offset + def _conv_event(eid, text, role): + return { + "eventId": eid, + "payload": [ + { + "conversational": { + "content": { + "text": f'{{"message": {{"role": "{role}", ' + f'"content": [{{"text": "{text}"}}]}}, "message_id": {eid}}}' + }, + "role": role.upper(), + } + } + ], + } + + mock_gmdp = session_manager.memory_client.gmdp_client + mock_gmdp.list_events.return_value = { + "events": [_conv_event(1, "Hello", "user"), _conv_event(2, "Hi", "assistant")] + } + + messages = session_manager.list_messages("test-session-456", "test-agent-123", limit=10) + + assert len(messages) == 2 def test_append_message_handles_none_from_create_message(self, session_manager, test_agent): """Test that append_message gracefully handles None return from create_message.""" diff --git a/tests/bedrock_agentcore/memory/integrations/strands/test_agentcore_memory_session_manager_openai_converter.py b/tests/bedrock_agentcore/memory/integrations/strands/test_agentcore_memory_session_manager_openai_converter.py index e16fd25a..b1fec477 100644 --- a/tests/bedrock_agentcore/memory/integrations/strands/test_agentcore_memory_session_manager_openai_converter.py +++ b/tests/bedrock_agentcore/memory/integrations/strands/test_agentcore_memory_session_manager_openai_converter.py @@ -94,6 +94,7 @@ def test_list_messages_filters_restored_tool_context(): manager.session_id = config.session_id manager.session = Session(session_id=config.session_id, session_type=SessionType.AGENT) + manager.memory_client.gmdp_client.list_events.return_value = {"events": [{"payload": []}]} manager.converter = Mock() manager.converter.events_to_messages.return_value = [ SessionMessage(message={"role": "user", "content": [{"text": "hello"}]}, message_id=0), diff --git a/tests_integ/memory/integrations/test_session_manager.py b/tests_integ/memory/integrations/test_session_manager.py index 5ab308ba..9033b3e7 100644 --- a/tests_integ/memory/integrations/test_session_manager.py +++ b/tests_integ/memory/integrations/test_session_manager.py @@ -375,3 +375,29 @@ def test_agent_multi_turn_with_batching(self, test_memory_stm): assert len(messages) >= 6 # endregion End-to-end agent with batching tests + + def test_list_messages_limit_excludes_state_events(self, test_memory_stm, memory_client): + """list_messages with limit returns the requested count even when state events are present. + + https://github.com/aws/bedrock-agentcore-sdk-python/pull/244 changed state events + (session/agent blobs) to share the same actorId as conversational events, distinguished + only by metadata. If list_messages counts raw events toward the limit, state events + consume slots and the caller gets fewer messages than requested. + """ + session_id = f"test-pagination-{uuid.uuid4().hex[:8]}" + actor_id = f"test-actor-{uuid.uuid4().hex[:8]}" + + config = AgentCoreMemoryConfig( + memory_id=test_memory_stm["id"], + session_id=session_id, + actor_id=actor_id, + ) + sm = AgentCoreMemorySessionManager(agentcore_memory_config=config, region_name=REGION) + + agent = Agent(system_prompt="You are a helpful assistant.", session_manager=sm) + agent("Remember the number 42") + agent("Remember the color blue") + agent("Remember the city Paris") + + messages = sm.list_messages(session_id, agent.agent_id, limit=4) + assert len(messages) == 4