Skip to content
Closed
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
3ec3eec
add clone session function
lwangverizon Jan 17, 2026
0c04652
add new scenariors
lwangverizon Jan 17, 2026
312cf08
add dedup
lwangverizon Jan 17, 2026
063c066
address feedback
lwangverizon Jan 17, 2026
12b994c
address n+1 query problem
lwangverizon Jan 18, 2026
31d0858
n+1 fix
lwangverizon Jan 18, 2026
cb1e3a6
add sorting
lwangverizon Jan 18, 2026
d67baec
consolidate duplicate logic to _prepare_sessions_for_cloning helper f…
lwangverizon Jan 18, 2026
42e7d36
explicit max calculation
lwangverizon Jan 18, 2026
608979b
Update src/google/adk/sessions/vertex_ai_session_service.py
lwangverizon Jan 18, 2026
74dd8e7
Update src/google/adk/sessions/sqlite_session_service.py
lwangverizon Jan 18, 2026
6f38fc2
Update src/google/adk/sessions/database_session_service.py
lwangverizon Jan 18, 2026
8d05a3f
Update src/google/adk/sessions/base_session_service.py
lwangverizon Jan 18, 2026
bc8839a
format clone_session only
lwangverizon Jan 18, 2026
d4f4e9b
consolidate test cases
lwangverizon Jan 18, 2026
172c1d1
Merge remote-tracking branch 'upstream/main' into feature/clone-session
lwangverizon Jan 22, 2026
03f12b7
improve deduplication logic
lwangverizon Jan 22, 2026
31a936b
Merge remote-tracking branch 'upstream/main' into feature/clone-session
lwangverizon Jan 27, 2026
22f3a70
Merge remote-tracking branch 'upstream/main' into feature/clone-session
lwangverizon Jan 28, 2026
c3fa471
sync with main
lwangverizon Feb 4, 2026
38fdf64
Merge branch 'main' into feature/clone-session
lwangverizon Feb 4, 2026
c821d70
add support for adk web
lwangverizon Feb 7, 2026
12cee90
Merge branch 'main' into feature/clone-session
lwangverizon Feb 10, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions src/google/adk/sessions/base_session_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,48 @@ async def delete_session(
) -> None:
"""Deletes a session."""

@abc.abstractmethod
async def clone_session(
self,
*,
app_name: str,
src_user_id: str,
src_session_id: Optional[str] = None,
new_user_id: Optional[str] = None,
new_session_id: Optional[str] = None,
) -> Session:
"""Clones session(s) and their events to a new session.

This method supports two modes:

1. Single session clone: When `src_session_id` is provided, clones that
specific session to the new session.

2. All sessions clone: When `src_session_id` is NOT provided, finds all
sessions for `src_user_id` and merges ALL their events into a single
new session.

Events are automatically deduplicated by event ID - only the first
occurrence of each event ID is kept.

Args:
app_name: The name of the app.
src_user_id: The source user ID whose session(s) to clone.
src_session_id: The source session ID to clone. If not provided, all
sessions for the source user will be merged into one new session.
new_user_id: The user ID for the new session. If not provided, uses the
same user_id as the source.
new_session_id: The session ID for the new session. If not provided, a
new ID will be auto-generated (UUID4).

Returns:
The newly created session with cloned events.

Raises:
ValueError: If no source sessions are found.
AlreadyExistsError: If a session with new_session_id already exists.
"""

async def append_event(self, session: Session, event: Event) -> Event:
"""Appends an event to a session object."""
if event.partial:
Expand Down
112 changes: 112 additions & 0 deletions src/google/adk/sessions/database_session_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,118 @@ async def delete_session(
await sql_session.execute(stmt)
await sql_session.commit()

@override
async def clone_session(
self,
*,
app_name: str,
src_user_id: str,
src_session_id: Optional[str] = None,
new_user_id: Optional[str] = None,
new_session_id: Optional[str] = None,
) -> Session:
await self._prepare_tables()

# Use source values as defaults
new_user_id = new_user_id or src_user_id

schema = self._get_schema_classes()

# Collect source sessions and their events
source_sessions = []
if src_session_id:
# Single session clone - use get_session (no N+1 issue)
session = await self.get_session(
app_name=app_name,
user_id=src_user_id,
session_id=src_session_id,
)
if not session:
raise ValueError(
f"Source session {src_session_id} not found for user"
f" {src_user_id}."
)
source_sessions.append(session)
else:
# All sessions clone - optimized to avoid N+1 query problem
# Step 1: Get all sessions with state (no events)
list_response = await self.list_sessions(
app_name=app_name, user_id=src_user_id
)
if not list_response.sessions:
raise ValueError(f"No sessions found for user {src_user_id}.")

session_ids = [sess.id for sess in list_response.sessions]

# Step 2: Fetch ALL events for all session IDs in a single query
async with self.database_session_factory() as sql_session:
stmt = (
select(schema.StorageEvent)
.filter(schema.StorageEvent.app_name == app_name)
.filter(schema.StorageEvent.user_id == src_user_id)
.filter(schema.StorageEvent.session_id.in_(session_ids))
.order_by(schema.StorageEvent.timestamp.asc())
)
result = await sql_session.execute(stmt)
all_storage_events = result.scalars().all()

# Step 3: Map events back to sessions
events_by_session_id = {}
for storage_event in all_storage_events:
sid = storage_event.session_id
if sid not in events_by_session_id:
events_by_session_id[sid] = []
events_by_session_id[sid].append(storage_event.to_event())
Comment thread
lwangverizon marked this conversation as resolved.
Outdated

# Build full session objects with events
for sess in list_response.sessions:
sess.events = events_by_session_id.get(sess.id, [])
source_sessions.append(sess)

# Sort sessions by update time for deterministic state merging
source_sessions.sort(key=lambda s: s.last_update_time)

# Merge states from all source sessions
merged_state = {}
for session in source_sessions:
merged_state.update(copy.deepcopy(session.state))
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The merging of states from source sessions is non-deterministic. The source_sessions list is not ordered, so when merged_state.update() is called in a loop, the final value for any conflicting state keys will depend on the arbitrary order of sessions. According to the PR description, "later sessions [should overwrite] earlier values", which implies a deterministic order. Please sort the source_sessions before merging to ensure deterministic behavior.

    # Sort sessions by update time for deterministic state merging
    source_sessions.sort(key=lambda s: s.last_update_time)
    # Merge states from all source sessions
    merged_state = {}
    for session in source_sessions:
      merged_state.update(copy.deepcopy(session.state))

Comment thread
lwangverizon marked this conversation as resolved.
Outdated

# Create the new session (new_session_id=None triggers UUID4 generation)
new_session = await self.create_session(
app_name=app_name,
user_id=new_user_id,
state=merged_state,
session_id=new_session_id,
)

# Collect all events, sort by timestamp, then deduplicate
# to ensure chronological "first occurrence wins"
all_source_events = []
for session in source_sessions:
all_source_events.extend(session.events)
all_source_events.sort(key=lambda e: e.timestamp)

all_events = []
seen_event_ids = set()
for event in all_source_events:
if event.id in seen_event_ids:
continue
seen_event_ids.add(event.id)
all_events.append(event)

# Copy events to the new session using bulk insert
async with self.database_session_factory() as sql_session:
new_storage_events = [
schema.StorageEvent.from_event(new_session, copy.deepcopy(event))
for event in all_events
]
sql_session.add_all(new_storage_events)
await sql_session.commit()
Comment thread
lwangverizon marked this conversation as resolved.
Comment thread
lwangverizon marked this conversation as resolved.

# Return the new session with events (avoid redundant DB query)
new_session.events = all_events
return new_session

@override
async def append_event(self, session: Session, event: Event) -> Event:
await self._prepare_tables()
Expand Down
111 changes: 111 additions & 0 deletions src/google/adk/sessions/in_memory_session_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,117 @@ def _delete_session_impl(

self.sessions[app_name][user_id].pop(session_id)

@override
async def clone_session(
self,
*,
app_name: str,
src_user_id: str,
src_session_id: Optional[str] = None,
new_user_id: Optional[str] = None,
new_session_id: Optional[str] = None,
) -> Session:
return self._clone_session_impl(
app_name=app_name,
src_user_id=src_user_id,
src_session_id=src_session_id,
new_user_id=new_user_id,
new_session_id=new_session_id,
)

def _clone_session_impl(
self,
*,
app_name: str,
src_user_id: str,
src_session_id: Optional[str] = None,
new_user_id: Optional[str] = None,
new_session_id: Optional[str] = None,
) -> Session:
# Use source values as defaults
new_user_id = new_user_id or src_user_id

# Collect source sessions and their events
source_sessions = []
if src_session_id:
# Single session clone
session = self._get_session_impl(
app_name=app_name,
user_id=src_user_id,
session_id=src_session_id,
)
if not session:
raise ValueError(
f'Source session {src_session_id} not found for user'
f' {src_user_id}.'
)
source_sessions.append(session)
else:
# All sessions clone - optimized direct access to avoid N+1 lookups
if (
app_name not in self.sessions
or src_user_id not in self.sessions[app_name]
):
raise ValueError(f'No sessions found for user {src_user_id}.')

user_sessions = self.sessions[app_name][src_user_id]
if not user_sessions:
raise ValueError(f'No sessions found for user {src_user_id}.')

# Directly access storage sessions and build full session objects
for session_id, storage_session in user_sessions.items():
# Deep copy the session to avoid mutations
copied_session = copy.deepcopy(storage_session)
# Merge state with app and user state
copied_session = self._merge_state(app_name, src_user_id, copied_session)
source_sessions.append(copied_session)

# Sort sessions by update time for deterministic state merging
source_sessions.sort(key=lambda s: s.last_update_time)

# Merge states from all source sessions
merged_state = {}
for session in source_sessions:
merged_state.update(copy.deepcopy(session.state))
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The merging of states from source sessions can be non-deterministic. When cloning all sessions, source_sessions is populated by iterating over a dictionary. While insertion order is preserved in recent Python versions, it's not guaranteed to be chronological if sessions are deleted and re-added. To ensure deterministic state merging where later sessions overwrite earlier ones, you should sort source_sessions by last_update_time.

    # Sort sessions by update time for deterministic state merging
    source_sessions.sort(key=lambda s: s.last_update_time)
    # Merge states from all source sessions
    merged_state = {}
    for session in source_sessions:
      merged_state.update(copy.deepcopy(session.state))

Comment thread
lwangverizon marked this conversation as resolved.
Outdated

# Create the new session (new_session_id=None triggers UUID4 generation)
new_session = self._create_session_impl(
app_name=app_name,
user_id=new_user_id,
state=merged_state,
session_id=new_session_id,
)

# Collect all events, sort by timestamp, then deduplicate
# to ensure chronological "first occurrence wins"
all_source_events = []
for session in source_sessions:
all_source_events.extend(session.events)
all_source_events.sort(key=lambda e: e.timestamp)

all_events = []
seen_event_ids = set()
for event in all_source_events:
if event.id in seen_event_ids:
continue
seen_event_ids.add(event.id)
all_events.append(copy.deepcopy(event))

# Get latest update time from sorted sessions
latest_update_time = (
source_sessions[-1].last_update_time if source_sessions else 0.0
)
Comment thread
lwangverizon marked this conversation as resolved.
Comment thread
lwangverizon marked this conversation as resolved.

# Get the storage session and set events
storage_session = self.sessions[app_name][new_user_id][new_session.id]
storage_session.events = all_events
storage_session.last_update_time = latest_update_time

# Return the new session with events (avoid redundant lookup)
new_session.events = all_events
new_session.last_update_time = latest_update_time
return new_session

@override
async def append_event(self, session: Session, event: Event) -> Event:
if event.partial:
Expand Down
Loading