44from rich .console import Console
55
66from agentex import Agentex
7+ from agentex .lib .constants .ports import TEMPORAL_ADDRESS as DEFAULT_TEMPORAL_ADDRESS
78from agentex .lib .utils .logging import make_logger
89
910# Import Temporal client for direct workflow termination
1920def should_cleanup_on_restart () -> bool :
2021 """
2122 Check if cleanup should be performed on restart.
22-
23+
2324 Returns True if:
2425 - ENVIRONMENT=development, OR
2526 - AUTO_CLEANUP_ON_RESTART=true
2627 """
2728 env = os .getenv ("ENVIRONMENT" , "" ).lower ()
2829 auto_cleanup = os .getenv ("AUTO_CLEANUP_ON_RESTART" , "true" ).lower ()
29-
30+
3031 return env == "development" or auto_cleanup == "true"
3132
3233
33- def cleanup_agent_workflows (
34- agent_name : str ,
35- force : bool = False ,
36- development_only : bool = True
37- ) -> None :
34+ def cleanup_agent_workflows (agent_name : str , force : bool = False , development_only : bool = True ) -> None :
3835 """
3936 Clean up all running workflows for an agent during development.
40-
37+
4138 This cancels (graceful) all running tasks for the specified agent.
4239 When force=True, directly terminates workflows via Temporal client.
43-
40+
4441 Args:
4542 agent_name: Name of the agent to cleanup workflows for
4643 force: If True, directly terminate workflows via Temporal client
4744 development_only: Only perform cleanup in development environment
4845 """
49-
46+
5047 # Safety check - only run in development mode by default
5148 if development_only and not force and not should_cleanup_on_restart ():
5249 logger .warning ("Cleanup skipped - not in development mode. Use --force to override." )
5350 return
54-
51+
5552 method = "terminate (direct)" if force else "cancel (via agent)"
5653 console .print (f"[blue]Cleaning up workflows for agent '{ agent_name } ' using { method } ...[/blue]" )
57-
54+
5855 try :
5956 client = Agentex ()
60-
57+
6158 # Get all running tasks
6259 if agent_name :
6360 all_tasks = client .tasks .list (agent_name = agent_name )
6461 else :
6562 all_tasks = client .tasks .list ()
66- running_tasks = [task for task in all_tasks if hasattr (task , ' status' ) and task .status == "RUNNING" ]
67-
63+ running_tasks = [task for task in all_tasks if hasattr (task , " status" ) and task .status == "RUNNING" ]
64+
6865 if not running_tasks :
6966 console .print ("[yellow]No running tasks found[/yellow]" )
7067 return
71-
68+
7269 console .print (f"[blue]Cleaning up { len (running_tasks )} running task(s) for agent '{ agent_name } '...[/blue]" )
73-
70+
7471 successful_cleanups = 0
7572 total_tasks = len (running_tasks )
76-
73+
7774 for task in running_tasks :
7875 task_cleanup_success = False
79-
76+
8077 if force :
8178 # Force mode: Do both graceful RPC cancellation AND direct Temporal termination
8279 rpc_success = False
8380 temporal_success = False
84-
81+
8582 try :
8683 # First: Graceful cancellation via agent RPC (handles database/agent cleanup)
8784 cleanup_single_task (client , agent_name , task .id )
8885 logger .debug (f"Completed RPC cancellation for task { task .id } " )
8986 rpc_success = True
9087 except Exception as e :
9188 logger .warning (f"RPC cancellation failed for task { task .id } : { e } " )
92-
89+
9390 try :
9491 # Second: Direct Temporal termination (ensures workflow is forcefully stopped)
9592 asyncio .run (cleanup_single_task_direct (task .id ))
9693 logger .debug (f"Completed Temporal termination for task { task .id } " )
9794 temporal_success = True
9895 except Exception as e :
9996 logger .warning (f"Temporal termination failed for task { task .id } : { e } " )
100-
97+
10198 # Count as success if either operation succeeded
10299 task_cleanup_success = rpc_success or temporal_success
103-
100+
104101 else :
105102 # Normal mode: Only graceful cancellation via agent RPC
106103 try :
@@ -109,21 +106,25 @@ def cleanup_agent_workflows(
109106 except Exception as e :
110107 logger .error (f"Failed to cleanup task { task .id } : { e } " )
111108 task_cleanup_success = False
112-
109+
113110 if task_cleanup_success :
114111 successful_cleanups += 1
115112 logger .debug (f"Successfully cleaned up task { task .id } " )
116113 else :
117114 logger .error (f"Failed to cleanup task { task .id } " )
118115 # Don't increment successful_cleanups for actual failures
119-
116+
120117 if successful_cleanups == total_tasks :
121- console .print (f"[green]✓ Successfully cleaned up all { successful_cleanups } task(s) for agent '{ agent_name } '[/green]" )
118+ console .print (
119+ f"[green]✓ Successfully cleaned up all { successful_cleanups } task(s) for agent '{ agent_name } '[/green]"
120+ )
122121 elif successful_cleanups > 0 :
123- console .print (f"[yellow]⚠ Successfully cleaned up { successful_cleanups } /{ total_tasks } task(s) for agent '{ agent_name } '[/yellow]" )
122+ console .print (
123+ f"[yellow]⚠ Successfully cleaned up { successful_cleanups } /{ total_tasks } task(s) for agent '{ agent_name } '[/yellow]"
124+ )
124125 else :
125126 console .print (f"[red]✗ Failed to cleanup any tasks for agent '{ agent_name } '[/red]" )
126-
127+
127128 except Exception as e :
128129 console .print (f"[red]Agent workflow cleanup failed: { str (e )} [/red]" )
129130 logger .exception ("Agent workflow cleanup failed" )
@@ -133,51 +134,47 @@ def cleanup_agent_workflows(
133134async def cleanup_single_task_direct (task_id : str ) -> None :
134135 """
135136 Directly terminate a workflow using Temporal client.
136-
137+
137138 Args:
138139 task_id: ID of the task (used as workflow_id)
139140 """
140141 if TemporalClient is None :
141142 raise ImportError ("temporalio package not available for direct workflow termination" )
142-
143+
143144 try :
144145 # Connect to Temporal server (assumes default localhost:7233)
145- client = await TemporalClient .connect ("localhost:7233" ) # type: ignore
146-
146+ client = await TemporalClient .connect (DEFAULT_TEMPORAL_ADDRESS ) # type: ignore
147+
147148 # Get workflow handle and terminate
148149 handle = client .get_workflow_handle (workflow_id = task_id ) # type: ignore
149150 await handle .terminate () # type: ignore
150-
151+
151152 logger .debug (f"Successfully terminated workflow { task_id } via Temporal client" )
152-
153+
153154 except Exception as e :
154155 # Check if the workflow was already completed - this is actually a success case
155156 if "workflow execution already completed" in str (e ).lower ():
156157 logger .debug (f"Workflow { task_id } was already completed - no termination needed" )
157158 return # Don't raise an exception for this case
158-
159+
159160 logger .error (f"Failed to terminate workflow { task_id } via Temporal client: { e } " )
160161 raise
161162
162163
163164def cleanup_single_task (client : Agentex , agent_name : str , task_id : str ) -> None :
164165 """
165166 Clean up a single task/workflow using agent RPC cancel method.
166-
167+
167168 Args:
168- client: Agentex client instance
169+ client: Agentex client instance
169170 agent_name: Name of the agent that owns the task
170171 task_id: ID of the task to cleanup
171172 """
172173 try :
173174 # Use the agent RPC method to cancel the task
174- client .agents .rpc_by_name (
175- agent_name = agent_name ,
176- method = "task/cancel" ,
177- params = {"task_id" : task_id }
178- )
175+ client .agents .rpc_by_name (agent_name = agent_name , method = "task/cancel" , params = {"task_id" : task_id })
179176 logger .debug (f"Successfully cancelled task { task_id } via agent '{ agent_name } '" )
180-
177+
181178 except Exception as e :
182179 logger .warning (f"RPC task/cancel failed for task { task_id } : { e } " )
183- raise
180+ raise
0 commit comments