-
Notifications
You must be signed in to change notification settings - Fork 57
Expand file tree
/
Copy path_sync_server.py
More file actions
250 lines (215 loc) · 7.1 KB
/
_sync_server.py
File metadata and controls
250 lines (215 loc) · 7.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
from __future__ import annotations
import asyncio
import atexit
import os
import signal
import subprocess
import time
import warnings
from functools import partial
from queue import Queue
from threading import Thread
from typing import TYPE_CHECKING, NamedTuple
from .kaleido import Kaleido
if TYPE_CHECKING:
from typing import Any, Callable
class Task(NamedTuple):
fn: str
args: Any
kwargs: Any
class _BadFunctionName(BaseException):
"""For use when programmed poorly."""
class GlobalKaleidoServer:
_instance = None
async def _server(self, *args, **kwargs):
async with Kaleido(*args, **kwargs) as k: # multiple processor? Enable GPU?
while True:
task = self._task_queue.get() # thread dies if main thread dies
if task is None:
self._task_queue.task_done()
return
if not hasattr(k, task.fn):
raise _BadFunctionName(f"Kaleido has no attribute {task.fn}")
try:
self._return_queue.put(
await getattr(k, task.fn)(*task.args, **task.kwargs),
)
except Exception as e: # noqa: BLE001
self._return_queue.put(e)
self._task_queue.task_done()
def __new__(cls):
# Create the singleton on first instantiation
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._initialized = False # noqa: SLF001
return cls._instance
def is_running(self):
return self._initialized
def open(self, *args: Any, silence_warnings=False, **kwargs: Any) -> None:
"""Initialize the singleton with three values."""
if self.is_running():
if not silence_warnings:
warnings.warn(
"Server already open.",
RuntimeWarning,
stacklevel=2,
)
return
coroutine = self._server(*args, **kwargs)
self._thread: Thread = Thread(
target=asyncio.run,
args=(coroutine,),
daemon=True,
)
self._task_queue: Queue[Task | None] = Queue()
self._return_queue: Queue[Any] = Queue()
self._thread.start()
self._initialized = True
close = partial(self.close, silence_warnings=True)
atexit.register(close)
def close(self, *, silence_warnings=False):
"""Reset the singleton back to an uninitialized state."""
if not self.is_running():
if not silence_warnings:
warnings.warn(
"Server already closed.",
RuntimeWarning,
stacklevel=2,
)
return
self._task_queue.put(None)
self._thread.join()
del self._thread
del self._task_queue
del self._return_queue
self._initialized = False
def call_function(self, cmd: str, *args: Any, **kwargs: Any):
"""
Call any function on the singleton Kaleido object.
Preferred functions would be: `calc_fig`, `write_fig`, and
`write_fig_from_object`. Methods that doesn't exist will raise a
BaseException.
Args:
cmd (str): the name of the method to call
args (Any): the method's arguments
kwargs (Any): the method's keyword arguments
"""
if not self.is_running():
raise RuntimeError("Can't call function on stopped server.")
if kwargs.pop("kopts", None):
warnings.warn(
"The kopts argument is ignored if using a server.",
UserWarning,
stacklevel=3,
)
self._task_queue.put(Task(cmd, args, kwargs))
self._task_queue.join()
res = self._return_queue.get()
if isinstance(res, BaseException):
raise res
else:
return res
def oneshot_async_run(
func: Callable,
args: tuple[Any, ...],
kwargs: dict[str, Any],
*,
sync_timeout: float | None = None,
) -> Any:
"""
Run a thread to execute a single function.
Used by _sync functions in
`__init__` to ensure their async loop is separate from the users main
one.
Args:
func: the function to run
args: a tuple of arguments to pass
kwargs: a dictionary of keyword arguments to pass
"""
q: Queue[Any] = Queue(maxsize=1)
def run(func, q, *args, **kwargs):
# func is a closure
try:
q.put(asyncio.run(func(*args, **kwargs)))
except BaseException as e: # noqa: BLE001
q.put(e)
def _pid_exists(pid: int) -> bool:
try:
os.kill(pid, 0)
return True
except OSError:
return False
def _kill_child_chrome_processes() -> None:
try:
result = subprocess.run(
["ps", "-Ao", "pid=,ppid=,command="],
check=False,
capture_output=True,
text=True,
)
except OSError:
return
children: dict[int, list[int]] = {}
commands: dict[int, str] = {}
for line in result.stdout.splitlines():
if not line.strip():
continue
parts = line.strip().split(maxsplit=2)
if len(parts) < 2:
continue
try:
pid = int(parts[0])
ppid = int(parts[1])
except ValueError:
continue
command = parts[2] if len(parts) > 2 else ""
children.setdefault(ppid, []).append(pid)
commands[pid] = command
descendants: set[int] = set()
stack = [os.getpid()]
while stack:
current = stack.pop()
for child in children.get(current, []):
if child in descendants:
continue
descendants.add(child)
stack.append(child)
chrome_pids = [
pid
for pid in descendants
if "chrome" in commands.get(pid, "").lower()
or "chromium" in commands.get(pid, "").lower()
]
for pid in chrome_pids:
try:
os.kill(pid, signal.SIGTERM)
except OSError:
continue
if chrome_pids:
time.sleep(0.5)
for pid in chrome_pids:
if not _pid_exists(pid):
continue
try:
os.kill(pid, signal.SIGKILL)
except OSError:
continue
t = Thread(
target=run,
args=(func, q, *args),
kwargs=kwargs,
daemon=sync_timeout is not None,
)
t.start()
t.join(timeout=sync_timeout)
if t.is_alive():
if sync_timeout is not None:
_kill_child_chrome_processes()
raise TimeoutError(
"Kaleido sync call exceeded the timeout; Chrome termination attempted.",
)
res = q.get()
if isinstance(res, BaseException):
raise res
else:
return res