From 43a799fc2f282937b94e48da5ce20528ce578dfe Mon Sep 17 00:00:00 2001 From: "T.J Ariyawansa" Date: Sat, 31 Jan 2026 11:54:53 -0500 Subject: [PATCH 1/3] feat(memory): use event metadata for session/agent state identification Replace actorId prefix-based approach with event metadata for distinguishing session and agent state events. Add auto-migration for legacy events. Changes: - Add StateType enum (SESSION, AGENT) and metadata keys - Update create_session/create_agent to include stateType metadata - Update read_session/read_agent to filter by metadata - Add backwards-compatible auto-migration: legacy events are converted to new format on read (create new with metadata, delete old) - Add tests for legacy migration behavior --- .../integrations/strands/session_manager.py | 152 ++++++++++++------ .../test_agentcore_memory_session_manager.py | 78 +++++++-- 2 files changed, 167 insertions(+), 63 deletions(-) diff --git a/src/bedrock_agentcore/memory/integrations/strands/session_manager.py b/src/bedrock_agentcore/memory/integrations/strands/session_manager.py index c3912a7..502586c 100644 --- a/src/bedrock_agentcore/memory/integrations/strands/session_manager.py +++ b/src/bedrock_agentcore/memory/integrations/strands/session_manager.py @@ -5,6 +5,7 @@ import threading from concurrent.futures import ThreadPoolExecutor, as_completed from datetime import datetime, timedelta, timezone +from enum import Enum from typing import TYPE_CHECKING, Any, Optional import boto3 @@ -19,6 +20,7 @@ from typing_extensions import override from bedrock_agentcore.memory.client import MemoryClient +from bedrock_agentcore.memory.models.filters import EventMetadataFilter, LeftExpression, OperatorType, RightExpression from .bedrock_converter import AgentCoreMemoryConverter from .config import AgentCoreMemoryConfig, RetrievalConfig @@ -28,11 +30,23 @@ logger = logging.getLogger(__name__) -SESSION_PREFIX = "session_" -AGENT_PREFIX = "agent_" -MESSAGE_PREFIX = "message_" MAX_FETCH_ALL_RESULTS = 10000 +# Legacy prefixes for backwards compatibility with old events +LEGACY_SESSION_PREFIX = "session_" +LEGACY_AGENT_PREFIX = "agent_" + +# Metadata keys for event identification +STATE_TYPE_KEY = "stateType" +AGENT_ID_KEY = "agentId" + + +class StateType(Enum): + """State type for distinguishing session and agent metadata in events.""" + + SESSION = "SESSION" + AGENT = "AGENT" + class AgentCoreMemorySessionManager(RepositorySessionManager, SessionRepository): """AgentCore Memory-based session manager for Bedrock AgentCore Memory integration. @@ -125,38 +139,6 @@ def __init__( ) super().__init__(session_id=self.config.session_id, session_repository=self) - def _get_full_session_id(self, session_id: str) -> str: - """Get the full session ID with the configured prefix. - - Args: - session_id (str): The session ID. - - Returns: - str: The full session ID with the prefix. - """ - full_session_id = f"{SESSION_PREFIX}{session_id}" - if full_session_id == self.config.actor_id: - raise SessionException( - f"Cannot have session [ {full_session_id} ] with the same ID as the actor ID: {self.config.actor_id}" - ) - return full_session_id - - def _get_full_agent_id(self, agent_id: str) -> str: - """Get the full agent ID with the configured prefix. - - Args: - agent_id (str): The agent ID. - - Returns: - str: The full agent ID with the prefix. - """ - full_agent_id = f"{AGENT_PREFIX}{agent_id}" - if full_agent_id == self.config.actor_id: - raise SessionException( - f"Cannot create agent [ {full_agent_id} ] with the same ID as the actor ID: {self.config.actor_id}" - ) - return full_agent_id - # region SessionRepository interface implementation def create_session(self, session: Session, **kwargs: Any) -> Session: """Create a new session in AgentCore Memory. @@ -179,12 +161,13 @@ def create_session(self, session: Session, **kwargs: Any) -> Session: event = self.memory_client.gmdp_client.create_event( memoryId=self.config.memory_id, - actorId=self._get_full_session_id(session.session_id), + actorId=self.config.actor_id, sessionId=self.session_id, payload=[ {"blob": json.dumps(session.to_dict())}, ], eventTimestamp=self._get_monotonic_timestamp(), + metadata={STATE_TYPE_KEY: {"stringValue": StateType.SESSION.value}}, ) logger.info("Created session: %s with event: %s", session.session_id, event.get("event", {}).get("eventId")) return session @@ -206,17 +189,50 @@ def read_session(self, session_id: str, **kwargs: Any) -> Optional[Session]: if session_id != self.config.session_id: return None + # 1. Try new approach (metadata filter) + event_metadata = [ + EventMetadataFilter.build_expression( + left_operand=LeftExpression.build(STATE_TYPE_KEY), + operator=OperatorType.EQUALS_TO, + right_operand=RightExpression.build(StateType.SESSION.value), + ) + ] + events = self.memory_client.list_events( memory_id=self.config.memory_id, - actor_id=self._get_full_session_id(session_id), + actor_id=self.config.actor_id, session_id=session_id, + event_metadata=event_metadata, max_results=1, ) - if not events: - return None + if events: + session_data = json.loads(events[0].get("payload", {})[0].get("blob")) + return Session.from_dict(session_data) + + # 2. Fallback: check for legacy event and migrate + legacy_actor_id = f"{LEGACY_SESSION_PREFIX}{session_id}" + events = self.memory_client.list_events( + memory_id=self.config.memory_id, + actor_id=legacy_actor_id, + session_id=session_id, + max_results=1, + ) + if events: + old_event = events[0] + session_data = json.loads(old_event.get("payload", {})[0].get("blob")) + session = Session.from_dict(session_data) + # Migrate: create new event with metadata, delete old + self.create_session(session) + self.memory_client.gmdp_client.delete_event( + memoryId=self.config.memory_id, + actorId=legacy_actor_id, + sessionId=session_id, + eventId=old_event.get("eventId"), + ) + logger.info("Migrated legacy session event for session: %s", session_id) + return session - session_data = json.loads(events[0].get("payload", {})[0].get("blob")) - return Session.from_dict(session_data) + return None def delete_session(self, session_id: str, **kwargs: Any) -> None: """Delete session and all associated data. @@ -250,12 +266,16 @@ def create_agent(self, session_id: str, session_agent: SessionAgent, **kwargs: A event = self.memory_client.gmdp_client.create_event( memoryId=self.config.memory_id, - actorId=self._get_full_agent_id(session_agent.agent_id), + actorId=self.config.actor_id, sessionId=self.session_id, payload=[ {"blob": json.dumps(session_agent.to_dict())}, ], eventTimestamp=self._get_monotonic_timestamp(), + metadata={ + STATE_TYPE_KEY: {"stringValue": StateType.AGENT.value}, + AGENT_ID_KEY: {"stringValue": session_agent.agent_id}, + }, ) logger.info( "Created agent: %s in session: %s with event %s", @@ -280,18 +300,56 @@ def read_agent(self, session_id: str, agent_id: str, **kwargs: Any) -> Optional[ if session_id != self.config.session_id: return None try: + # 1. Try new approach (metadata filter) + event_metadata = [ + EventMetadataFilter.build_expression( + left_operand=LeftExpression.build(STATE_TYPE_KEY), + operator=OperatorType.EQUALS_TO, + right_operand=RightExpression.build(StateType.AGENT.value), + ), + EventMetadataFilter.build_expression( + left_operand=LeftExpression.build(AGENT_ID_KEY), + operator=OperatorType.EQUALS_TO, + right_operand=RightExpression.build(agent_id), + ), + ] + events = self.memory_client.list_events( memory_id=self.config.memory_id, - actor_id=self._get_full_agent_id(agent_id), + actor_id=self.config.actor_id, session_id=session_id, + event_metadata=event_metadata, max_results=1, ) - if not events: - return None + if events: + agent_data = json.loads(events[0].get("payload", {})[0].get("blob")) + return SessionAgent.from_dict(agent_data) - agent_data = json.loads(events[0].get("payload", {})[0].get("blob")) - return SessionAgent.from_dict(agent_data) + # 2. Fallback: check for legacy event and migrate + legacy_actor_id = f"{LEGACY_AGENT_PREFIX}{agent_id}" + events = self.memory_client.list_events( + memory_id=self.config.memory_id, + actor_id=legacy_actor_id, + session_id=session_id, + max_results=1, + ) + if events: + old_event = events[0] + agent_data = json.loads(old_event.get("payload", {})[0].get("blob")) + agent = SessionAgent.from_dict(agent_data) + # Migrate: create new event with metadata, delete old + self.create_agent(session_id, agent) + self.memory_client.gmdp_client.delete_event( + memoryId=self.config.memory_id, + actorId=legacy_actor_id, + sessionId=session_id, + eventId=old_event.get("eventId"), + ) + logger.info("Migrated legacy agent event for agent: %s", agent_id) + return agent + + return None except Exception as e: logger.error("Failed to read agent %s", e) return None diff --git a/tests/bedrock_agentcore/memory/integrations/strands/test_agentcore_memory_session_manager.py b/tests/bedrock_agentcore/memory/integrations/strands/test_agentcore_memory_session_manager.py index a01973c..918571f 100644 --- a/tests/bedrock_agentcore/memory/integrations/strands/test_agentcore_memory_session_manager.py +++ b/tests/bedrock_agentcore/memory/integrations/strands/test_agentcore_memory_session_manager.py @@ -160,6 +160,37 @@ def test_read_session_invalid(self, session_manager): assert result is None + def test_read_session_legacy_migration(self, session_manager, mock_memory_client): + """Test reading a legacy session event triggers migration.""" + legacy_session_data = '{"session_id": "test-session-456", "session_type": "AGENT"}' + + # First call (new approach with metadata) returns empty + # Second call (legacy actor_id) returns the legacy event + mock_memory_client.list_events.side_effect = [ + [], # New approach returns nothing + [{"eventId": "legacy-event-1", "payload": [{"blob": legacy_session_data}]}], # Legacy approach + ] + mock_memory_client.gmdp_client.create_event.return_value = {"event": {"eventId": "new-event-1"}} + + result = session_manager.read_session("test-session-456") + + # Verify session was returned + assert result is not None + assert result.session_id == "test-session-456" + assert result.session_type == SessionType.AGENT + + # Verify migration: new event created with metadata + mock_memory_client.gmdp_client.create_event.assert_called_once() + create_call_kwargs = mock_memory_client.gmdp_client.create_event.call_args.kwargs + assert "metadata" in create_call_kwargs + assert create_call_kwargs["metadata"]["stateType"]["stringValue"] == "SESSION" + + # Verify migration: old event deleted + mock_memory_client.gmdp_client.delete_event.assert_called_once() + delete_call_kwargs = mock_memory_client.gmdp_client.delete_event.call_args.kwargs + assert delete_call_kwargs["actorId"] == "session_test-session-456" + assert delete_call_kwargs["eventId"] == "legacy-event-1" + def test_create_agent(self, session_manager): """Test creating an agent.""" session_agent = SessionAgent(agent_id="test-agent-123", state={}, conversation_manager_state={}) @@ -198,6 +229,37 @@ def test_read_agent_no_events(self, session_manager, mock_memory_client): assert result is None + def test_read_agent_legacy_migration(self, session_manager, mock_memory_client): + """Test reading a legacy agent event triggers migration.""" + legacy_agent_data = '{"agent_id": "test-agent-123", "state": {}, "conversation_manager_state": {}}' + + # First call (new approach with metadata) returns empty + # Second call (legacy actor_id) returns the legacy event + mock_memory_client.list_events.side_effect = [ + [], # New approach returns nothing + [{"eventId": "legacy-agent-event-1", "payload": [{"blob": legacy_agent_data}]}], # Legacy approach + ] + mock_memory_client.gmdp_client.create_event.return_value = {"event": {"eventId": "new-agent-event-1"}} + + result = session_manager.read_agent("test-session-456", "test-agent-123") + + # Verify agent was returned + assert result is not None + assert result.agent_id == "test-agent-123" + + # Verify migration: new event created with metadata + mock_memory_client.gmdp_client.create_event.assert_called_once() + create_call_kwargs = mock_memory_client.gmdp_client.create_event.call_args.kwargs + assert "metadata" in create_call_kwargs + assert create_call_kwargs["metadata"]["stateType"]["stringValue"] == "AGENT" + assert create_call_kwargs["metadata"]["agentId"]["stringValue"] == "test-agent-123" + + # Verify migration: old event deleted + mock_memory_client.gmdp_client.delete_event.assert_called_once() + delete_call_kwargs = mock_memory_client.gmdp_client.delete_event.call_args.kwargs + assert delete_call_kwargs["actorId"] == "agent_test-agent-123" + assert delete_call_kwargs["eventId"] == "legacy-agent-event-1" + def test_create_message(self, session_manager, mock_memory_client): """Test creating a message.""" mock_memory_client.create_event.return_value = {"eventId": "event-123"} @@ -925,22 +987,6 @@ def test_init_with_boto_config(self, agentcore_config, mock_memory_client): manager = AgentCoreMemorySessionManager(agentcore_config, boto_client_config=boto_config) assert manager.memory_client is not None - def test_get_full_session_id_conflict(self, session_manager): - """Test session ID conflict with actor ID.""" - # Set up a scenario where session ID would conflict with actor ID - session_manager.config.actor_id = "session_test-session" - - with pytest.raises(SessionException, match="Cannot have session"): - session_manager._get_full_session_id("test-session") - - def test_get_full_agent_id_conflict(self, session_manager): - """Test agent ID conflict with actor ID.""" - # Set up a scenario where agent ID would conflict with actor ID - session_manager.config.actor_id = "agent_test-agent" - - with pytest.raises(SessionException, match="Cannot create agent"): - session_manager._get_full_agent_id("test-agent") - def test_retrieve_customer_context_no_messages(self, agentcore_config_with_retrieval, mock_memory_client): """Test retrieve_customer_context with no messages.""" with patch( From b659eebd27e8218df13cde93b1e1098fa7ffc504 Mon Sep 17 00:00:00 2001 From: "T.J Ariyawansa" Date: Sat, 31 Jan 2026 13:51:40 -0500 Subject: [PATCH 2/3] fix(memory): handle eventual consistency and pagination with metadata filters - Fix pagination bug in list_events where API returns nextToken even with 0 results, causing "metadata filter mismatch" error on subsequent page - Add _retry_with_backoff method for handling eventual consistency when reading newly created agents via metadata filter - Track created agent IDs to handle updates during consistency window - Update test to account for retry behavior in legacy migration --- src/bedrock_agentcore/memory/client.py | 3 +- .../integrations/strands/session_manager.py | 67 +++++++++++++++++-- .../test_agentcore_memory_session_manager.py | 11 +-- 3 files changed, 72 insertions(+), 9 deletions(-) diff --git a/src/bedrock_agentcore/memory/client.py b/src/bedrock_agentcore/memory/client.py index eb0d6d4..86c2884 100644 --- a/src/bedrock_agentcore/memory/client.py +++ b/src/bedrock_agentcore/memory/client.py @@ -860,7 +860,8 @@ def list_events( all_events.extend(events) next_token = response.get("nextToken") - if not next_token or len(all_events) >= max_results: + # Break if: no more pages, reached max, or current page was empty (filter found nothing) + if not next_token or len(all_events) >= max_results or not events: break logger.info("Retrieved total of %d events", len(all_events)) diff --git a/src/bedrock_agentcore/memory/integrations/strands/session_manager.py b/src/bedrock_agentcore/memory/integrations/strands/session_manager.py index 502586c..00936e6 100644 --- a/src/bedrock_agentcore/memory/integrations/strands/session_manager.py +++ b/src/bedrock_agentcore/memory/integrations/strands/session_manager.py @@ -3,10 +3,11 @@ import json import logging import threading +import time from concurrent.futures import ThreadPoolExecutor, as_completed from datetime import datetime, timedelta, timezone from enum import Enum -from typing import TYPE_CHECKING, Any, Optional +from typing import TYPE_CHECKING, Any, Callable, Optional, TypeVar import boto3 from botocore.config import Config as BotocoreConfig @@ -95,6 +96,53 @@ def _get_monotonic_timestamp(cls, desired_timestamp: Optional[datetime] = None) cls._last_timestamp = new_timestamp return new_timestamp + def _retry_with_backoff( + self, + func: Callable[..., Any], + *args: Any, + max_retries: int = 3, + base_delay: float = 0.1, + **kwargs: Any, + ) -> Any: + """Execute a function with exponential backoff retry logic. + + Handles eventual consistency by retrying operations that return empty results + or raise throttling exceptions. + + Args: + func: The function to execute. + *args: Positional arguments to pass to the function. + max_retries: Maximum number of retry attempts (default: 3). + base_delay: Base delay in seconds between retries (default: 0.1). + **kwargs: Keyword arguments to pass to the function. + + Returns: + The result of the function call. + + Raises: + The last exception if all retries are exhausted. + """ + last_exception = None + for attempt in range(max_retries): + try: + result = func(*args, **kwargs) + # For list operations, retry if empty (eventual consistency) + if isinstance(result, list) and len(result) == 0 and attempt < max_retries - 1: + time.sleep(base_delay * (attempt + 1)) + continue + return result + except Exception as e: + last_exception = e + error_code = getattr(e, "response", {}).get("Error", {}).get("Code", "") + if error_code == "ThrottlingException" and attempt < max_retries - 1: + time.sleep(base_delay * (attempt + 1)) + continue + raise + # Return empty list if we exhausted retries on empty results + if last_exception is None: + return [] + raise last_exception + def __init__( self, agentcore_memory_config: AgentCoreMemoryConfig, @@ -117,6 +165,8 @@ def __init__( self.memory_client = MemoryClient(region_name=region_name) session = boto_session or boto3.Session(region_name=region_name) self.has_existing_agent = False + # Track created agents to handle eventual consistency (agent may not be immediately queryable) + self._created_agent_ids: set[str] = set() # Override the clients if custom boto session or config is provided # Add strands-agents to the request user agent @@ -277,6 +327,8 @@ def create_agent(self, session_id: str, session_agent: SessionAgent, **kwargs: A AGENT_ID_KEY: {"stringValue": session_agent.agent_id}, }, ) + # Track created agent for eventual consistency handling + self._created_agent_ids.add(session_agent.agent_id) logger.info( "Created agent: %s in session: %s with event %s", session_agent.agent_id, @@ -314,7 +366,9 @@ def read_agent(self, session_id: str, agent_id: str, **kwargs: Any) -> Optional[ ), ] - events = self.memory_client.list_events( + # Use retry with backoff to handle eventual consistency + events = self._retry_with_backoff( + self.memory_client.list_events, memory_id=self.config.memory_id, actor_id=self.config.actor_id, session_id=session_id, @@ -368,9 +422,14 @@ def update_agent(self, session_id: str, session_agent: SessionAgent, **kwargs: A agent_id = session_agent.agent_id previous_agent = self.read_agent(session_id=session_id, agent_id=agent_id) if previous_agent is None: - raise SessionException(f"Agent {agent_id} in session {session_id} does not exist") + # Handle eventual consistency: if we just created this agent but can't read it yet + if agent_id not in self._created_agent_ids: + raise SessionException(f"Agent {agent_id} in session {session_id} does not exist") + # Agent was created but not yet queryable - proceed without updating created_at + logger.debug("Agent %s not yet queryable due to eventual consistency, proceeding with update", agent_id) + else: + session_agent.created_at = previous_agent.created_at - session_agent.created_at = previous_agent.created_at # Create a new agent as AgentCore Memory is immutable. We always get the latest one in `read_agent` self.create_agent(session_id, session_agent) diff --git a/tests/bedrock_agentcore/memory/integrations/strands/test_agentcore_memory_session_manager.py b/tests/bedrock_agentcore/memory/integrations/strands/test_agentcore_memory_session_manager.py index 918571f..5461da7 100644 --- a/tests/bedrock_agentcore/memory/integrations/strands/test_agentcore_memory_session_manager.py +++ b/tests/bedrock_agentcore/memory/integrations/strands/test_agentcore_memory_session_manager.py @@ -229,14 +229,17 @@ def test_read_agent_no_events(self, session_manager, mock_memory_client): assert result is None - def test_read_agent_legacy_migration(self, session_manager, mock_memory_client): + @patch("bedrock_agentcore.memory.integrations.strands.session_manager.time.sleep") + def test_read_agent_legacy_migration(self, mock_sleep, session_manager, mock_memory_client): """Test reading a legacy agent event triggers migration.""" legacy_agent_data = '{"agent_id": "test-agent-123", "state": {}, "conversation_manager_state": {}}' - # First call (new approach with metadata) returns empty - # Second call (legacy actor_id) returns the legacy event + # New approach with metadata is retried 3 times (all return empty) + # Then legacy actor_id approach returns the legacy event mock_memory_client.list_events.side_effect = [ - [], # New approach returns nothing + [], # New approach - attempt 1 + [], # New approach - attempt 2 + [], # New approach - attempt 3 [{"eventId": "legacy-agent-event-1", "payload": [{"blob": legacy_agent_data}]}], # Legacy approach ] mock_memory_client.gmdp_client.create_event.return_value = {"event": {"eventId": "new-agent-event-1"}} From 6f6a448912ef7275eb8b6d68cbeeb4a44c68ba5d Mon Sep 17 00:00:00 2001 From: "T.J Ariyawansa" Date: Fri, 6 Feb 2026 10:11:36 -0500 Subject: [PATCH 3/3] fix: Fixed pagination bug with nextToken --- src/bedrock_agentcore/memory/client.py | 4 +- .../integrations/strands/session_manager.py | 62 +------------------ 2 files changed, 4 insertions(+), 62 deletions(-) diff --git a/src/bedrock_agentcore/memory/client.py b/src/bedrock_agentcore/memory/client.py index 86c2884..e03fcb0 100644 --- a/src/bedrock_agentcore/memory/client.py +++ b/src/bedrock_agentcore/memory/client.py @@ -860,8 +860,8 @@ def list_events( all_events.extend(events) next_token = response.get("nextToken") - # Break if: no more pages, reached max, or current page was empty (filter found nothing) - if not next_token or len(all_events) >= max_results or not events: + # Break if: no more pages or reached max + if not next_token or len(all_events) >= max_results: break logger.info("Retrieved total of %d events", len(all_events)) diff --git a/src/bedrock_agentcore/memory/integrations/strands/session_manager.py b/src/bedrock_agentcore/memory/integrations/strands/session_manager.py index 00936e6..09e886d 100644 --- a/src/bedrock_agentcore/memory/integrations/strands/session_manager.py +++ b/src/bedrock_agentcore/memory/integrations/strands/session_manager.py @@ -96,53 +96,6 @@ def _get_monotonic_timestamp(cls, desired_timestamp: Optional[datetime] = None) cls._last_timestamp = new_timestamp return new_timestamp - def _retry_with_backoff( - self, - func: Callable[..., Any], - *args: Any, - max_retries: int = 3, - base_delay: float = 0.1, - **kwargs: Any, - ) -> Any: - """Execute a function with exponential backoff retry logic. - - Handles eventual consistency by retrying operations that return empty results - or raise throttling exceptions. - - Args: - func: The function to execute. - *args: Positional arguments to pass to the function. - max_retries: Maximum number of retry attempts (default: 3). - base_delay: Base delay in seconds between retries (default: 0.1). - **kwargs: Keyword arguments to pass to the function. - - Returns: - The result of the function call. - - Raises: - The last exception if all retries are exhausted. - """ - last_exception = None - for attempt in range(max_retries): - try: - result = func(*args, **kwargs) - # For list operations, retry if empty (eventual consistency) - if isinstance(result, list) and len(result) == 0 and attempt < max_retries - 1: - time.sleep(base_delay * (attempt + 1)) - continue - return result - except Exception as e: - last_exception = e - error_code = getattr(e, "response", {}).get("Error", {}).get("Code", "") - if error_code == "ThrottlingException" and attempt < max_retries - 1: - time.sleep(base_delay * (attempt + 1)) - continue - raise - # Return empty list if we exhausted retries on empty results - if last_exception is None: - return [] - raise last_exception - def __init__( self, agentcore_memory_config: AgentCoreMemoryConfig, @@ -165,10 +118,7 @@ def __init__( self.memory_client = MemoryClient(region_name=region_name) session = boto_session or boto3.Session(region_name=region_name) self.has_existing_agent = False - # Track created agents to handle eventual consistency (agent may not be immediately queryable) - self._created_agent_ids: set[str] = set() - # Override the clients if custom boto session or config is provided # Add strands-agents to the request user agent if boto_client_config: existing_user_agent = getattr(boto_client_config, "user_agent_extra", None) @@ -327,8 +277,6 @@ def create_agent(self, session_id: str, session_agent: SessionAgent, **kwargs: A AGENT_ID_KEY: {"stringValue": session_agent.agent_id}, }, ) - # Track created agent for eventual consistency handling - self._created_agent_ids.add(session_agent.agent_id) logger.info( "Created agent: %s in session: %s with event %s", session_agent.agent_id, @@ -366,9 +314,7 @@ def read_agent(self, session_id: str, agent_id: str, **kwargs: Any) -> Optional[ ), ] - # Use retry with backoff to handle eventual consistency - events = self._retry_with_backoff( - self.memory_client.list_events, + events = self.memory_client.list_events( memory_id=self.config.memory_id, actor_id=self.config.actor_id, session_id=session_id, @@ -422,11 +368,7 @@ def update_agent(self, session_id: str, session_agent: SessionAgent, **kwargs: A agent_id = session_agent.agent_id previous_agent = self.read_agent(session_id=session_id, agent_id=agent_id) if previous_agent is None: - # Handle eventual consistency: if we just created this agent but can't read it yet - if agent_id not in self._created_agent_ids: - raise SessionException(f"Agent {agent_id} in session {session_id} does not exist") - # Agent was created but not yet queryable - proceed without updating created_at - logger.debug("Agent %s not yet queryable due to eventual consistency, proceeding with update", agent_id) + raise SessionException(f"Agent {agent_id} in session {session_id} does not exist") else: session_agent.created_at = previous_agent.created_at