Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 50 additions & 21 deletions livekit-agents/livekit/agents/ipc/supervised_proc.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,8 @@ def _add_proc_ctx_log(record: logging.LogRecord) -> None:
mp_cch.close()

self._pid = self._proc.pid
if sys.platform != "win32":
self._proc_sentinel = os.dup(self._proc.sentinel)
self._join_fut = asyncio.Future[None]()

def _sync_run() -> None:
Expand Down Expand Up @@ -266,44 +268,70 @@ async def initialize(self) -> None:
self._initialize_fut.set_exception(e)
raise

def _on_exit_resolve_futures(self) -> None:
"""Called via add_reader when the process exits."""
if not self._shutdown_ack_fut.done():
self._shutdown_ack_fut.set_result(None)
if not self._shutting_down_fut.done():
self._shutting_down_fut.set_result(None)

def _close_sentinel(self) -> None:
"""Close the dup'd sentinel fd exactly once."""
if hasattr(self, "_proc_sentinel"):
self._loop.remove_reader(self._proc_sentinel)
os.close(self._proc_sentinel)
del self._proc_sentinel

async def aclose(self) -> None:
"""attempt to gracefully close the supervised process"""
if not self.started:
return

self._closing = True
with contextlib.suppress(duplex_unix.DuplexClosed):
await channel.asend_message(self._pch, proto.ShutdownRequest())

try:
await asyncio.wait_for(self._shutdown_ack_fut, timeout=self._opts.close_timeout)
except asyncio.TimeoutError:
logger.error(
"process did not ack shutdown in time, killing process",
extra=self.logging_extra(),
)
await self._send_dump_signal()
await self._send_kill_signal()
if sys.platform != "win32" and hasattr(self, "_proc_sentinel"):
loop = asyncio.get_running_loop()
loop.add_reader(self._proc_sentinel, self._on_exit_resolve_futures)

if not self._shutting_down_fut.done():
await self._shutting_down_fut
try:
with contextlib.suppress(duplex_unix.DuplexClosed):
await channel.asend_message(self._pch, proto.ShutdownRequest())

if self._supervise_atask and not self._supervise_atask.done():
try:
await asyncio.wait_for(
asyncio.shield(self._supervise_atask), timeout=self._opts.close_timeout
)
await asyncio.wait_for(self._shutdown_ack_fut, timeout=self._opts.close_timeout)
except asyncio.TimeoutError:
logger.error(
"process did not exit in time, killing process",
"process did not ack shutdown in time, killing process",
extra=self.logging_extra(),
)
await self._send_dump_signal()
await self._send_kill_signal()

async with self._lock:
if self._supervise_atask:
await asyncio.shield(self._supervise_atask)
if not self._shutting_down_fut.done():
await self._shutting_down_fut

if self._supervise_atask and not self._supervise_atask.done():
try:
await asyncio.wait_for(
asyncio.shield(self._supervise_atask),
timeout=self._opts.close_timeout,
)
except asyncio.TimeoutError:
logger.error(
"process did not exit in time, killing process",
extra=self.logging_extra(),
)
await self._send_dump_signal()
await self._send_kill_signal()

async with self._lock:
if self._supervise_atask:
await asyncio.shield(self._supervise_atask)
finally:
if hasattr(self, "_proc_sentinel"):
loop = asyncio.get_running_loop()
loop.remove_reader(self._proc_sentinel)
self._close_sentinel()

async def kill(self) -> None:
"""forcefully kill the supervised process"""
Expand Down Expand Up @@ -393,6 +421,7 @@ async def _supervise_task(self) -> None:
await self._join_fut
self._exitcode = self._proc.exitcode
self._proc.close()
self._close_sentinel()
Comment thread
devin-ai-integration[bot] marked this conversation as resolved.
await aio.cancel_and_wait(ping_task, read_ipc_task, main_task)

if memory_monitor_task is not None:
Expand Down
Loading