-
Notifications
You must be signed in to change notification settings - Fork 11
Expand file tree
/
Copy pathprocess_utils.py
More file actions
55 lines (43 loc) · 1.79 KB
/
process_utils.py
File metadata and controls
55 lines (43 loc) · 1.79 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
from __future__ import annotations
import threading
import weakref
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures.thread import _worker
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from hstest.dynamic.security.thread_group import ThreadGroup
class DaemonThreadPoolExecutor(ThreadPoolExecutor):
def __init__(self, max_workers: int = 1, name: str = "", group: ThreadGroup = None) -> None:
super().__init__(max_workers=max_workers, thread_name_prefix=name)
self.group = group
# Adjusted method from the ThreadPoolExecutor class just to create threads as daemons
def _adjust_thread_count(self) -> None:
if self._idle_semaphore.acquire(timeout=0):
return
def weakref_cb(_, q=self._work_queue) -> None:
q.put(None)
num_threads = len(self._threads)
if num_threads < self._max_workers:
thread_name = "%s_%d" % (self._thread_name_prefix or self, num_threads)
# Python 3.14+
if hasattr(self, "_create_worker_context"):
args = (
weakref.ref(self, weakref_cb),
self._create_worker_context(),
self._work_queue,
)
else:
args = (
weakref.ref(self, weakref_cb),
self._work_queue,
self._initializer,
self._initargs,
)
t = threading.Thread(name=thread_name, target=_worker, args=args, group=self.group)
t.daemon = True
t.start()
self._threads.add(t)
def is_port_in_use(port):
import socket
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
return s.connect_ex(("localhost", port)) == 0