Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 14 additions & 6 deletions src/google/adk/sessions/database_session_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -536,11 +536,7 @@ async def append_event(self, session: Session, event: Event) -> Event:
is_sqlite = self.db_engine.dialect.name == _SQLITE_DIALECT
use_row_level_locking = self._supports_row_level_locking()

state_delta = (
event.actions.state_delta
if event.actions and event.actions.state_delta
else {}
)
state_delta = event.actions.state_delta if event.actions.state_delta else {}
state_deltas = _session_util.extract_state_delta(state_delta)
has_app_delta = bool(state_deltas["app"])
has_user_delta = bool(state_deltas["user"])
Expand All @@ -564,6 +560,18 @@ async def append_event(self, session: Session, event: Event) -> Event:
if storage_session is None:
raise ValueError(f"Session {session.id} not found.")

# Pre-analyze state deltas to determine which scopes actually need
# write locks. Most events carry only session-scoped state (or no
# state at all), so acquiring FOR UPDATE on app_states / user_states
# unnecessarily serializes all concurrent append_event calls.
state_deltas = (
_session_util.extract_state_delta(event.actions.state_delta)
if event.actions.state_delta
else None
)
has_app_delta = bool(state_deltas and state_deltas.get("app"))
has_user_delta = bool(state_deltas and state_deltas.get("user"))

storage_app_state = await _select_required_state(
sql_session=sql_session,
state_model=schema.StorageAppState,
Expand Down Expand Up @@ -622,7 +630,7 @@ async def append_event(self, session: Session, event: Event) -> Event:
storage_user_state.state = (
storage_user_state.state | state_deltas["user"]
)
if state_deltas["session"]:
if state_deltas and state_deltas["session"]:
storage_session.state = (
storage_session.state | state_deltas["session"]
)
Expand Down