-
Notifications
You must be signed in to change notification settings - Fork 3.3k
Feature/clone session #4191
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Feature/clone session #4191
Changes from 7 commits
3ec3eec
0c04652
312cf08
063c066
12b994c
31d0858
cb1e3a6
d67baec
42e7d36
608979b
74dd8e7
6f38fc2
8d05a3f
bc8839a
d4f4e9b
172c1d1
03f12b7
31a936b
22f3a70
c3fa471
38fdf64
c821d70
12cee90
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -415,6 +415,118 @@ async def delete_session( | |
| await sql_session.execute(stmt) | ||
| await sql_session.commit() | ||
|
|
||
| @override | ||
| async def clone_session( | ||
| self, | ||
| *, | ||
| app_name: str, | ||
| src_user_id: str, | ||
| src_session_id: Optional[str] = None, | ||
| new_user_id: Optional[str] = None, | ||
| new_session_id: Optional[str] = None, | ||
| ) -> Session: | ||
| await self._prepare_tables() | ||
|
|
||
| # Use source values as defaults | ||
| new_user_id = new_user_id or src_user_id | ||
|
|
||
| schema = self._get_schema_classes() | ||
|
|
||
| # Collect source sessions and their events | ||
| source_sessions = [] | ||
| if src_session_id: | ||
| # Single session clone - use get_session (no N+1 issue) | ||
| session = await self.get_session( | ||
| app_name=app_name, | ||
| user_id=src_user_id, | ||
| session_id=src_session_id, | ||
| ) | ||
| if not session: | ||
| raise ValueError( | ||
| f"Source session {src_session_id} not found for user" | ||
| f" {src_user_id}." | ||
| ) | ||
| source_sessions.append(session) | ||
| else: | ||
| # All sessions clone - optimized to avoid N+1 query problem | ||
| # Step 1: Get all sessions with state (no events) | ||
| list_response = await self.list_sessions( | ||
| app_name=app_name, user_id=src_user_id | ||
| ) | ||
| if not list_response.sessions: | ||
| raise ValueError(f"No sessions found for user {src_user_id}.") | ||
|
|
||
| session_ids = [sess.id for sess in list_response.sessions] | ||
|
|
||
| # Step 2: Fetch ALL events for all session IDs in a single query | ||
| async with self.database_session_factory() as sql_session: | ||
| stmt = ( | ||
| select(schema.StorageEvent) | ||
| .filter(schema.StorageEvent.app_name == app_name) | ||
| .filter(schema.StorageEvent.user_id == src_user_id) | ||
| .filter(schema.StorageEvent.session_id.in_(session_ids)) | ||
| .order_by(schema.StorageEvent.timestamp.asc()) | ||
| ) | ||
| result = await sql_session.execute(stmt) | ||
| all_storage_events = result.scalars().all() | ||
|
|
||
| # Step 3: Map events back to sessions | ||
| events_by_session_id = {} | ||
| for storage_event in all_storage_events: | ||
| sid = storage_event.session_id | ||
| if sid not in events_by_session_id: | ||
| events_by_session_id[sid] = [] | ||
| events_by_session_id[sid].append(storage_event.to_event()) | ||
|
|
||
| # Build full session objects with events | ||
| for sess in list_response.sessions: | ||
| sess.events = events_by_session_id.get(sess.id, []) | ||
| source_sessions.append(sess) | ||
|
|
||
| # Sort sessions by update time for deterministic state merging | ||
| source_sessions.sort(key=lambda s: s.last_update_time) | ||
|
|
||
| # Merge states from all source sessions | ||
| merged_state = {} | ||
| for session in source_sessions: | ||
| merged_state.update(copy.deepcopy(session.state)) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The merging of states from source sessions is non-deterministic. The # Sort sessions by update time for deterministic state merging
source_sessions.sort(key=lambda s: s.last_update_time)
# Merge states from all source sessions
merged_state = {}
for session in source_sessions:
merged_state.update(copy.deepcopy(session.state))
lwangverizon marked this conversation as resolved.
Outdated
|
||
|
|
||
| # Create the new session (new_session_id=None triggers UUID4 generation) | ||
| new_session = await self.create_session( | ||
| app_name=app_name, | ||
| user_id=new_user_id, | ||
| state=merged_state, | ||
| session_id=new_session_id, | ||
| ) | ||
|
|
||
| # Collect all events, sort by timestamp, then deduplicate | ||
| # to ensure chronological "first occurrence wins" | ||
| all_source_events = [] | ||
| for session in source_sessions: | ||
| all_source_events.extend(session.events) | ||
| all_source_events.sort(key=lambda e: e.timestamp) | ||
|
|
||
| all_events = [] | ||
| seen_event_ids = set() | ||
| for event in all_source_events: | ||
| if event.id in seen_event_ids: | ||
| continue | ||
| seen_event_ids.add(event.id) | ||
| all_events.append(event) | ||
|
|
||
| # Copy events to the new session using bulk insert | ||
| async with self.database_session_factory() as sql_session: | ||
| new_storage_events = [ | ||
| schema.StorageEvent.from_event(new_session, copy.deepcopy(event)) | ||
| for event in all_events | ||
| ] | ||
| sql_session.add_all(new_storage_events) | ||
| await sql_session.commit() | ||
|
lwangverizon marked this conversation as resolved.
lwangverizon marked this conversation as resolved.
|
||
|
|
||
| # Return the new session with events (avoid redundant DB query) | ||
| new_session.events = all_events | ||
| return new_session | ||
|
|
||
| @override | ||
| async def append_event(self, session: Session, event: Event) -> Event: | ||
| await self._prepare_tables() | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -286,6 +286,117 @@ def _delete_session_impl( | |
|
|
||
| self.sessions[app_name][user_id].pop(session_id) | ||
|
|
||
| @override | ||
| async def clone_session( | ||
| self, | ||
| *, | ||
| app_name: str, | ||
| src_user_id: str, | ||
| src_session_id: Optional[str] = None, | ||
| new_user_id: Optional[str] = None, | ||
| new_session_id: Optional[str] = None, | ||
| ) -> Session: | ||
| return self._clone_session_impl( | ||
| app_name=app_name, | ||
| src_user_id=src_user_id, | ||
| src_session_id=src_session_id, | ||
| new_user_id=new_user_id, | ||
| new_session_id=new_session_id, | ||
| ) | ||
|
|
||
| def _clone_session_impl( | ||
| self, | ||
| *, | ||
| app_name: str, | ||
| src_user_id: str, | ||
| src_session_id: Optional[str] = None, | ||
| new_user_id: Optional[str] = None, | ||
| new_session_id: Optional[str] = None, | ||
| ) -> Session: | ||
| # Use source values as defaults | ||
| new_user_id = new_user_id or src_user_id | ||
|
|
||
| # Collect source sessions and their events | ||
| source_sessions = [] | ||
| if src_session_id: | ||
| # Single session clone | ||
| session = self._get_session_impl( | ||
| app_name=app_name, | ||
| user_id=src_user_id, | ||
| session_id=src_session_id, | ||
| ) | ||
| if not session: | ||
| raise ValueError( | ||
| f'Source session {src_session_id} not found for user' | ||
| f' {src_user_id}.' | ||
| ) | ||
| source_sessions.append(session) | ||
| else: | ||
| # All sessions clone - optimized direct access to avoid N+1 lookups | ||
| if ( | ||
| app_name not in self.sessions | ||
| or src_user_id not in self.sessions[app_name] | ||
| ): | ||
| raise ValueError(f'No sessions found for user {src_user_id}.') | ||
|
|
||
| user_sessions = self.sessions[app_name][src_user_id] | ||
| if not user_sessions: | ||
| raise ValueError(f'No sessions found for user {src_user_id}.') | ||
|
|
||
| # Directly access storage sessions and build full session objects | ||
| for session_id, storage_session in user_sessions.items(): | ||
| # Deep copy the session to avoid mutations | ||
| copied_session = copy.deepcopy(storage_session) | ||
| # Merge state with app and user state | ||
| copied_session = self._merge_state(app_name, src_user_id, copied_session) | ||
| source_sessions.append(copied_session) | ||
|
|
||
| # Sort sessions by update time for deterministic state merging | ||
| source_sessions.sort(key=lambda s: s.last_update_time) | ||
|
|
||
| # Merge states from all source sessions | ||
| merged_state = {} | ||
| for session in source_sessions: | ||
| merged_state.update(copy.deepcopy(session.state)) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The merging of states from source sessions can be non-deterministic. When cloning all sessions, # Sort sessions by update time for deterministic state merging
source_sessions.sort(key=lambda s: s.last_update_time)
# Merge states from all source sessions
merged_state = {}
for session in source_sessions:
merged_state.update(copy.deepcopy(session.state))
lwangverizon marked this conversation as resolved.
Outdated
|
||
|
|
||
| # Create the new session (new_session_id=None triggers UUID4 generation) | ||
| new_session = self._create_session_impl( | ||
| app_name=app_name, | ||
| user_id=new_user_id, | ||
| state=merged_state, | ||
| session_id=new_session_id, | ||
| ) | ||
|
|
||
| # Collect all events, sort by timestamp, then deduplicate | ||
| # to ensure chronological "first occurrence wins" | ||
| all_source_events = [] | ||
| for session in source_sessions: | ||
| all_source_events.extend(session.events) | ||
| all_source_events.sort(key=lambda e: e.timestamp) | ||
|
|
||
| all_events = [] | ||
| seen_event_ids = set() | ||
| for event in all_source_events: | ||
| if event.id in seen_event_ids: | ||
| continue | ||
| seen_event_ids.add(event.id) | ||
| all_events.append(copy.deepcopy(event)) | ||
|
|
||
| # Get latest update time from sorted sessions | ||
| latest_update_time = ( | ||
| source_sessions[-1].last_update_time if source_sessions else 0.0 | ||
| ) | ||
|
lwangverizon marked this conversation as resolved.
lwangverizon marked this conversation as resolved.
|
||
|
|
||
| # Get the storage session and set events | ||
| storage_session = self.sessions[app_name][new_user_id][new_session.id] | ||
| storage_session.events = all_events | ||
| storage_session.last_update_time = latest_update_time | ||
|
|
||
| # Return the new session with events (avoid redundant lookup) | ||
| new_session.events = all_events | ||
| new_session.last_update_time = latest_update_time | ||
| return new_session | ||
|
|
||
| @override | ||
| async def append_event(self, session: Session, event: Event) -> Event: | ||
| if event.partial: | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.