-
Notifications
You must be signed in to change notification settings - Fork 4
Expand file tree
/
Copy pathexecution_timeout.py
More file actions
231 lines (190 loc) · 8 KB
/
execution_timeout.py
File metadata and controls
231 lines (190 loc) · 8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
"""
This module provides execution timeout monitoring for Jupyter notebook cells.
It can detect long-running executions and optionally send warnings or interrupt them.
"""
import os
import signal
import threading
import time
from typing import Optional
import requests
from IPython.core.interactiveshell import ExecutionInfo, ExecutionResult
from .get_webapp_url import get_absolute_userpod_api_url
from .logging import LoggerManager
class ExecutionTimeoutMonitor:
"""
Monitors execution duration and can send warnings or interrupt stuck executions.
"""
def __init__(
self,
warning_threshold_seconds: int = 240,
timeout_seconds: int = 300,
enable_auto_interrupt: bool = False,
):
"""
Initialize the execution timeout monitor.
Args:
warning_threshold_seconds: Seconds after which to send a warning (default: 240s = 4min)
timeout_seconds: Seconds after which to consider execution stuck (default: 300s = 5min)
enable_auto_interrupt: Whether to automatically interrupt stuck executions (default: False)
"""
self.logger = LoggerManager().get_logger()
self.warning_threshold = warning_threshold_seconds
self.timeout_threshold = timeout_seconds
self.enable_auto_interrupt = enable_auto_interrupt
self.current_execution: Optional[dict] = None
self.warning_timer: Optional[threading.Timer] = None
self.timeout_timer: Optional[threading.Timer] = None
self._execution_lock = threading.Lock()
def on_pre_execute(self, info: ExecutionInfo) -> None:
"""
Called before executing a cell.
Starts timers for warning and timeout.
"""
cell_preview = info.raw_cell[:100] if info.raw_cell else "<empty>"
with self._execution_lock:
self.current_execution = {
"code": cell_preview,
"start": time.time(),
}
# Start warning timer
if self.warning_threshold > 0:
self.warning_timer = threading.Timer(
self.warning_threshold, self._send_warning
)
self.warning_timer.daemon = True
self.warning_timer.start()
# Start timeout timer
if self.enable_auto_interrupt and self.timeout_threshold > 0:
self.timeout_timer = threading.Timer(
self.timeout_threshold, self._interrupt_execution
)
self.timeout_timer.daemon = True
self.timeout_timer.start()
self.logger.debug(
"Timeout monitoring started: warning=%ds, timeout=%ds, auto_interrupt=%s",
self.warning_threshold,
self.timeout_threshold,
self.enable_auto_interrupt,
)
def on_post_execute(self, result: ExecutionResult) -> None:
"""
Called after executing a cell.
Cancels any pending timers.
"""
with self._execution_lock:
self._cancel_timers()
self.current_execution = None
def _cancel_timers(self) -> None:
"""Cancel all active timers."""
if self.warning_timer:
self.warning_timer.cancel()
self.warning_timer = None
if self.timeout_timer:
self.timeout_timer.cancel()
self.timeout_timer = None
def _send_warning(self) -> None:
"""Send warning when execution is running longer than threshold."""
# Capture execution data while holding lock
with self._execution_lock:
if not self.current_execution:
return
execution_data = self.current_execution.copy()
# Process outside lock to avoid blocking
duration = time.time() - execution_data["start"]
code_preview = execution_data["code"][:50]
self.logger.warning(
"LONG_EXECUTION | duration=%.1fs | preview=%s",
duration,
code_preview.replace("\n", "\\n"),
)
# Try to report to webapp
self._report_to_webapp(duration, code_preview, warning=True)
def _interrupt_execution(self) -> None:
"""Interrupt execution after timeout threshold is exceeded."""
# Capture execution data while holding lock
with self._execution_lock:
if not self.current_execution:
return
execution_data = self.current_execution.copy()
# Process outside lock to avoid blocking
duration = time.time() - execution_data["start"]
code_preview = execution_data["code"][:50]
self.logger.error(
"TIMEOUT_INTERRUPT | duration=%.1fs | Sending SIGINT to interrupt execution",
duration,
)
# Report to webapp before interrupting
self._report_to_webapp(duration, code_preview, warning=False)
# Send SIGINT to interrupt the execution (simulates Ctrl+C)
try:
os.kill(os.getpid(), signal.SIGINT)
except Exception as e: # pylint: disable=broad-exception-caught
self.logger.error("Failed to send SIGINT: %s", e)
def _report_to_webapp(
self, duration: float, code_preview: str, warning: bool
) -> None:
"""
Report execution warning/timeout to webapp.
Args:
duration: Execution duration in seconds
code_preview: Preview of the code being executed
warning: Whether this is a warning (True) or timeout (False)
"""
try:
endpoint = "warning" if warning else "timeout"
url = get_absolute_userpod_api_url(f"execution/{endpoint}")
payload = {
"duration": duration,
"code_preview": code_preview,
"threshold": (
self.warning_threshold if warning else self.timeout_threshold
),
}
response = requests.post(url, json=payload, timeout=2)
response.raise_for_status()
self.logger.debug("Successfully reported %s to webapp", endpoint)
except Exception as e: # pylint: disable=broad-exception-caught
self.logger.error("Failed to report to webapp: %s", e)
# Global instance
_timeout_monitor: Optional[ExecutionTimeoutMonitor] = None
def setup_execution_timeout_monitor(
warning_threshold_seconds: int = 240,
timeout_seconds: int = 300,
enable_auto_interrupt: bool = False,
) -> None:
"""
Set up execution timeout monitoring.
This is optional and should be called during runtime initialization if needed.
Args:
warning_threshold_seconds: Seconds after which to send a warning (default: 240s = 4min)
timeout_seconds: Seconds after which to consider execution stuck (default: 300s = 5min)
enable_auto_interrupt: Whether to automatically interrupt stuck executions (default: False)
"""
global _timeout_monitor # pylint: disable=global-statement
try:
from IPython import get_ipython
ip = get_ipython()
if ip is None:
LoggerManager().get_logger().warning(
"IPython instance not available, skipping timeout monitor setup"
)
return
_timeout_monitor = ExecutionTimeoutMonitor(
warning_threshold_seconds=warning_threshold_seconds,
timeout_seconds=timeout_seconds,
enable_auto_interrupt=enable_auto_interrupt,
)
# Register event handlers
ip.events.register("pre_execute", _timeout_monitor.on_pre_execute)
ip.events.register("post_execute", _timeout_monitor.on_post_execute)
LoggerManager().get_logger().info(
"Execution timeout monitor initialized: warning=%ds, timeout=%ds, auto_interrupt=%s",
warning_threshold_seconds,
timeout_seconds,
enable_auto_interrupt,
)
except Exception as e: # pylint: disable=broad-exception-caught
LoggerManager().get_logger().error(
"Failed to set up timeout monitor: %s", e
)