diff --git a/src/google/adk/sessions/database_session_service.py b/src/google/adk/sessions/database_session_service.py index eed1d9eae6..c8e044001a 100644 --- a/src/google/adk/sessions/database_session_service.py +++ b/src/google/adk/sessions/database_session_service.py @@ -536,11 +536,7 @@ async def append_event(self, session: Session, event: Event) -> Event: is_sqlite = self.db_engine.dialect.name == _SQLITE_DIALECT use_row_level_locking = self._supports_row_level_locking() - state_delta = ( - event.actions.state_delta - if event.actions and event.actions.state_delta - else {} - ) + state_delta = event.actions.state_delta if event.actions.state_delta else {} state_deltas = _session_util.extract_state_delta(state_delta) has_app_delta = bool(state_deltas["app"]) has_user_delta = bool(state_deltas["user"]) @@ -564,6 +560,18 @@ async def append_event(self, session: Session, event: Event) -> Event: if storage_session is None: raise ValueError(f"Session {session.id} not found.") + # Pre-analyze state deltas to determine which scopes actually need + # write locks. Most events carry only session-scoped state (or no + # state at all), so acquiring FOR UPDATE on app_states / user_states + # unnecessarily serializes all concurrent append_event calls. + state_deltas = ( + _session_util.extract_state_delta(event.actions.state_delta) + if event.actions.state_delta + else None + ) + has_app_delta = bool(state_deltas and state_deltas.get("app")) + has_user_delta = bool(state_deltas and state_deltas.get("user")) + storage_app_state = await _select_required_state( sql_session=sql_session, state_model=schema.StorageAppState, @@ -622,7 +630,7 @@ async def append_event(self, session: Session, event: Event) -> Event: storage_user_state.state = ( storage_user_state.state | state_deltas["user"] ) - if state_deltas["session"]: + if state_deltas and state_deltas["session"]: storage_session.state = ( storage_session.state | state_deltas["session"] )