88"""
99
1010import asyncio
11- from typing import Any , AsyncGenerator , Optional
11+ import time
12+ from typing import Any , AsyncGenerator , Callable , Coroutine , Optional
1213
1314from pine_ai .models .events import C2SEvent , S2CEvent
1415from pine_ai .transport .socketio import SocketIOManager
1516
16- # Immediate-dispatch event types (Tier 2)
17- IMMEDIATE_EVENTS = {
18- S2CEvent .SESSION_STATE , S2CEvent .SESSION_INPUT_STATE , S2CEvent .SESSION_RICH_CONTENT ,
19- S2CEvent .SESSION_FORM_TO_USER ,
20- S2CEvent .SESSION_ASK_FOR_LOCATION , S2CEvent .SESSION_LOCATION_SELECTION ,
21- S2CEvent .SESSION_REWARD , S2CEvent .SESSION_PAYMENT , S2CEvent .SESSION_TASK_READY ,
17+ TERMINAL_STATES = {"task_finished" , "task_cancelled" , "task_stale" }
18+ DEFAULT_IDLE_TIMEOUT_S = 120.0
19+
20+ # Events that are buffered/debounced — NOT dispatched immediately
21+ BUFFERED_EVENTS = {S2CEvent .SESSION_TEXT_PART , S2CEvent .SESSION_WORK_LOG_PART }
22+
23+ # Substantive response events — track for waiting_input termination gating
24+ SUBSTANTIVE_EVENTS = {
25+ S2CEvent .SESSION_TEXT , S2CEvent .SESSION_FORM_TO_USER ,
26+ S2CEvent .SESSION_ASK_FOR_LOCATION , S2CEvent .SESSION_TASK_READY ,
2227 S2CEvent .SESSION_TASK_FINISHED , S2CEvent .SESSION_INTERACTIVE_AUTH_CONFIRMATION ,
23- S2CEvent .SESSION_THREE_WAY_CALL , S2CEvent .SESSION_ERROR , S2CEvent .SESSION_THINKING ,
24- S2CEvent .SESSION_WORK_LOG , S2CEvent .SESSION_UPDATE_TITLE , S2CEvent .SESSION_TEXT ,
25- S2CEvent .SESSION_MESSAGE_STATUS , S2CEvent .SESSION_CARD , S2CEvent .SESSION_NEXT_TASKS ,
26- S2CEvent .SESSION_CONTINUE_IN_NEW_TASK , S2CEvent .SESSION_SOCIAL_SHARING ,
27- S2CEvent .SESSION_RETRY , S2CEvent .SESSION_DEBUG , S2CEvent .SESSION_ACTION_STATUS ,
28- S2CEvent .SESSION_COMPUTER_USE_INTERVENTION ,
28+ S2CEvent .SESSION_THREE_WAY_CALL , S2CEvent .SESSION_REWARD ,
2929}
3030
3131
@@ -62,8 +62,15 @@ def collect(self, message_id: str, content: str, final: bool) -> Optional[str]:
6262
6363
6464class ChatEngine :
65- def __init__ (self , sio : SocketIOManager ):
65+ def __init__ (
66+ self ,
67+ sio : SocketIOManager ,
68+ check_session_state : Optional [Callable [[str ], Coroutine [Any , Any , dict [str , Any ]]]] = None ,
69+ idle_timeout_s : float = DEFAULT_IDLE_TIMEOUT_S ,
70+ ):
6671 self ._sio = sio
72+ self ._check_session_state = check_session_state
73+ self ._idle_timeout_s = idle_timeout_s
6774
6875 async def join_session (self , session_id : str ) -> dict [str , Any ]:
6976 """Join a session room — spec 5.1.1.
@@ -80,7 +87,12 @@ def leave_session(self, session_id: str) -> None:
8087 self ._sio .emit (C2SEvent .SESSION_LEAVE , None , session_id )
8188
8289 async def chat (
83- self , session_id : str , content : str ,
90+ self ,
91+ session_id : str ,
92+ content : str ,
93+ * ,
94+ attachments : Optional [list [dict [str , Any ]]] = None ,
95+ referenced_sessions : Optional [list [dict [str , str ]]] = None ,
8496 ) -> AsyncGenerator [ChatEvent , None ]:
8597 """Send a message and yield events with stream buffering.
8698 Production handler reads payload.data as {content, attachments, ...}.
@@ -90,22 +102,51 @@ async def chat(
90102 C2SEvent .SESSION_MESSAGE ,
91103 {
92104 "content" : content ,
93- "attachments" : [],
94- "referenced_sessions" : [],
105+ "attachments" : attachments or [],
106+ "referenced_sessions" : referenced_sessions or [],
95107 "client_now_date" : datetime .now ().isoformat (),
96108 },
97109 session_id ,
98110 )
99111 async for event in self ._listen (session_id ):
100112 yield event
101113
114+ def send_message (
115+ self ,
116+ session_id : str ,
117+ content : str ,
118+ * ,
119+ attachments : Optional [list [dict [str , Any ]]] = None ,
120+ referenced_sessions : Optional [list [dict [str , str ]]] = None ,
121+ ) -> None :
122+ """Fire-and-forget message send (no event listening)."""
123+ from datetime import datetime
124+ self ._sio .emit (
125+ C2SEvent .SESSION_MESSAGE ,
126+ {
127+ "content" : content ,
128+ "attachments" : attachments or [],
129+ "referenced_sessions" : referenced_sessions or [],
130+ "client_now_date" : datetime .now ().isoformat (),
131+ },
132+ session_id ,
133+ )
134+
102135 async def _listen (self , session_id : str ) -> AsyncGenerator [ChatEvent , None ]:
103136 """Listen for events with stream buffering."""
137+ # Check session state before entering loop — don't hang on completed sessions
138+ if self ._check_session_state :
139+ try :
140+ session = await self ._check_session_state (session_id )
141+ if session .get ("state" ) in TERMINAL_STATES :
142+ yield ChatEvent (type = S2CEvent .SESSION_STATE , session_id = session_id , data = {"content" : session ["state" ]})
143+ return
144+ except Exception :
145+ pass # best effort
146+
104147 text_buffer = TextPartBuffer ()
105148 queue : asyncio .Queue [Optional [ChatEvent ]] = asyncio .Queue ()
106149 done = False
107- # Only terminate on waiting_input AFTER agent has sent substantive content.
108- # The initial waiting_input (default state) arrives before agent starts.
109150 received_agent_response = False
110151
111152 # Work log debounce state
@@ -162,37 +203,41 @@ def handler(event: str, raw: dict[str, Any]) -> None:
162203 wl_timers [step_id ] = loop .call_later (3.0 , flush_wl , step_id )
163204 return
164205
165- # Tier 2: immediate events
166- if event in IMMEDIATE_EVENTS :
167- nonlocal received_agent_response
168- queue .put_nowait (ChatEvent (
169- type = event , session_id = session_id ,
170- message_id = message_id , data = data , metadata = metadata ,
171- ))
172- # Track substantive agent responses
173- if event in (
174- S2CEvent .SESSION_TEXT , S2CEvent .SESSION_FORM_TO_USER ,
175- S2CEvent .SESSION_ASK_FOR_LOCATION , S2CEvent .SESSION_TASK_READY ,
176- S2CEvent .SESSION_TASK_FINISHED , S2CEvent .SESSION_INTERACTIVE_AUTH_CONFIRMATION ,
177- S2CEvent .SESSION_THREE_WAY_CALL , S2CEvent .SESSION_REWARD ,
178- ):
179- received_agent_response = True
180- # Terminal conditions — only after agent has spoken
181- if event == S2CEvent .SESSION_INPUT_STATE and isinstance (data , dict ):
182- if data .get ("content" ) == "waiting_input" and received_agent_response :
183- done = True
184- queue .put_nowait (None )
185- if event == S2CEvent .SESSION_STATE and isinstance (data , dict ):
186- state = data .get ("content" , "" )
187- if state in ("task_finished" , "task_cancelled" , "task_stale" ):
188- done = True
189- queue .put_nowait (None )
206+ # All other events: dispatch immediately (pass-through for agent)
207+ nonlocal received_agent_response
208+ queue .put_nowait (ChatEvent (
209+ type = event , session_id = session_id ,
210+ message_id = message_id , data = data , metadata = metadata ,
211+ ))
212+ if event in SUBSTANTIVE_EVENTS :
213+ received_agent_response = True
214+ if event == S2CEvent .SESSION_INPUT_STATE and isinstance (data , dict ):
215+ if data .get ("content" ) == "waiting_input" and received_agent_response :
216+ done = True
217+ queue .put_nowait (None )
218+ if event == S2CEvent .SESSION_STATE and isinstance (data , dict ):
219+ state = data .get ("content" , "" )
220+ if state in TERMINAL_STATES :
221+ done = True
222+ queue .put_nowait (None )
190223
191224 remove_handler = self ._sio .add_event_handler (handler )
192225
193226 try :
194227 while not done :
195- evt = await queue .get ()
228+ try :
229+ evt = await asyncio .wait_for (queue .get (), timeout = self ._idle_timeout_s )
230+ except asyncio .TimeoutError :
231+ # Idle timeout — check session state via REST
232+ if self ._check_session_state :
233+ try :
234+ session = await self ._check_session_state (session_id )
235+ if session .get ("state" ) in TERMINAL_STATES :
236+ yield ChatEvent (type = S2CEvent .SESSION_STATE , session_id = session_id , data = {"content" : session ["state" ]})
237+ break
238+ except Exception :
239+ pass
240+ continue
196241 if evt is None :
197242 break
198243 yield evt
0 commit comments