Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import logging
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime, timedelta, timezone
from datetime import datetime, timezone
from enum import Enum
from typing import TYPE_CHECKING, Any, Dict, NamedTuple, Optional

Expand Down Expand Up @@ -98,38 +98,6 @@ class AgentCoreMemorySessionManager(RepositorySessionManager, SessionRepository)
- Consistent with existing Strands Session managers (such as: FileSessionManager, S3SessionManager)
"""

# Class-level timestamp tracking for monotonic ordering
_timestamp_lock = threading.Lock()
_last_timestamp: Optional[datetime] = None

@classmethod
def _get_monotonic_timestamp(cls, desired_timestamp: Optional[datetime] = None) -> datetime:
"""Get a monotonically increasing timestamp.

Args:
desired_timestamp (Optional[datetime]): The desired timestamp. If None, uses current time.

Returns:
datetime: A timestamp guaranteed to be greater than any previously returned timestamp.
"""
if desired_timestamp is None:
desired_timestamp = datetime.now(timezone.utc)

with cls._timestamp_lock:
if cls._last_timestamp is None:
cls._last_timestamp = desired_timestamp
return desired_timestamp

# Why the 1 second check? Because Boto3 does NOT support sub 1 second resolution.
if desired_timestamp <= cls._last_timestamp + timedelta(seconds=1):
# Increment by 1 second to ensure ordering
new_timestamp = cls._last_timestamp + timedelta(seconds=1)
else:
new_timestamp = desired_timestamp

cls._last_timestamp = new_timestamp
return new_timestamp

def __init__(
self,
agentcore_memory_config: AgentCoreMemoryConfig,
Expand Down Expand Up @@ -282,7 +250,7 @@ def create_session(self, session: Session, **kwargs: Any) -> Session:
payload=[
{"blob": json.dumps(session.to_dict())},
],
eventTimestamp=self._get_monotonic_timestamp(),
eventTimestamp=datetime.now(timezone.utc),
metadata={STATE_TYPE_KEY: {"stringValue": StateType.SESSION.value}},
)
logger.info("Created session: %s with event: %s", session.session_id, event.get("event", {}).get("eventId"))
Expand Down Expand Up @@ -414,7 +382,7 @@ def create_agent(self, session_id: str, session_agent: SessionAgent, **kwargs: A
payload=[
{"blob": json.dumps(session_agent.to_dict())},
],
eventTimestamp=self._get_monotonic_timestamp(),
eventTimestamp=datetime.now(timezone.utc),
metadata={
STATE_TYPE_KEY: {"stringValue": StateType.AGENT.value},
AGENT_ID_KEY: {"stringValue": session_agent.agent_id},
Expand Down Expand Up @@ -582,7 +550,6 @@ def create_message(

# Parse the original timestamp and use it as desired timestamp
original_timestamp = datetime.fromisoformat(session_message.created_at.replace("Z", "+00:00"))
monotonic_timestamp = self._get_monotonic_timestamp(original_timestamp)

if self.config.batch_size > 1:
# Buffer the pre-processed message
Expand All @@ -593,7 +560,7 @@ def create_message(
session_id=session_id,
messages=messages,
is_blob=is_blob,
timestamp=monotonic_timestamp,
timestamp=original_timestamp,
metadata=merged_metadata,
)
)
Expand All @@ -613,7 +580,7 @@ def create_message(
actor_id=self.config.actor_id,
session_id=session_id,
messages=messages,
event_timestamp=monotonic_timestamp,
event_timestamp=original_timestamp,
metadata=merged_metadata,
)
else:
Expand All @@ -622,7 +589,7 @@ def create_message(
"actorId": self.config.actor_id,
"sessionId": session_id,
"payload": [{"blob": json.dumps(messages[0])}],
"eventTimestamp": monotonic_timestamp,
"eventTimestamp": original_timestamp,
}
if merged_metadata:
create_event_kwargs["metadata"] = merged_metadata
Expand Down Expand Up @@ -1177,7 +1144,7 @@ def _flush_agent_states_only(self) -> list[dict[str, Any]]:
actorId=self.config.actor_id,
sessionId=self.config.session_id,
payload=payloads,
eventTimestamp=self._get_monotonic_timestamp(),
eventTimestamp=datetime.now(timezone.utc),
metadata={
STATE_TYPE_KEY: {"stringValue": StateType.AGENT.value},
AGENT_ID_KEY: {"stringValue": agent_id},
Expand Down