Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/py/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,10 @@ async with kaleido.Kaleido(n=4, timeout=90) as k:
# n is number of processes
await k.write_fig(fig, path="./", opts={"format":"jpg"})

# You can also set a default timeout via environment variable:
# export KALEIDO_RENDER_TIMEOUT=30
# Use "none" to disable timeouts.

# other `kaleido.Kaleido` arguments:
# page: Change library version (see PageGenerators below)

Expand Down
74 changes: 65 additions & 9 deletions src/py/kaleido/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,14 @@

from __future__ import annotations

from typing import TYPE_CHECKING
import warnings
from typing import TYPE_CHECKING, Literal

from choreographer.cli import get_chrome, get_chrome_sync

from . import _sync_server
from ._page_generator import PageGenerator
from .kaleido import Kaleido
from .kaleido import Kaleido, _resolve_timeout

if TYPE_CHECKING:
from collections.abc import AsyncIterable, Iterable
Expand Down Expand Up @@ -84,6 +85,7 @@ async def calc_fig(
*,
topojson: str | None = None,
kopts: dict[str, Any] | None = None,
timeout: float | None | Literal["auto"] = "auto",
):
"""
Return binary for plotly figure.
Expand All @@ -99,6 +101,7 @@ async def calc_fig(

"""
kopts = kopts or {}
kopts.setdefault("timeout", timeout)
kopts["n"] = 1 # should we force this?
async with Kaleido(**kopts) as k:
return await k.calc_fig(
Expand All @@ -115,6 +118,7 @@ async def write_fig(
*,
topojson: str | None = None,
kopts: dict[str, Any] | None = None,
timeout: float | None | Literal["auto"] = "auto",
**kwargs,
):
"""
Expand All @@ -128,7 +132,9 @@ async def write_fig(
See also the documentation for `Kaleido.write_fig()`.

"""
async with Kaleido(**(kopts or {})) as k:
kopts = kopts or {}
kopts.setdefault("timeout", timeout)
async with Kaleido(**kopts) as k:
return await k.write_fig(
fig,
path=path,
Expand All @@ -142,6 +148,7 @@ async def write_fig_from_object(
fig_dicts: FigureDict | AnyIterable[FigureDict],
*,
kopts: dict[str, Any] | None = None,
timeout: float | None | Literal["auto"] = "auto",
**kwargs,
):
"""
Expand All @@ -155,36 +162,85 @@ async def write_fig_from_object(
See also the documentation for `Kaleido.write_fig_from_object()`.

"""
async with Kaleido(**(kopts or {})) as k:
kopts = kopts or {}
kopts.setdefault("timeout", timeout)
async with Kaleido(**kopts) as k:
return await k.write_fig_from_object(
fig_dicts,
**kwargs,
)


def calc_fig_sync(*args: Any, **kwargs: Any):
def calc_fig_sync(
*args: Any,
timeout: float | None | Literal["auto"] = "auto",
**kwargs: Any,
):
"""Call `calc_fig` but blocking."""
if _global_server.is_running():
if timeout != "auto":
warnings.warn(
"The timeout argument is ignored if using a server.",
UserWarning,
stacklevel=2,
)
return _global_server.call_function("calc_fig", *args, **kwargs)
else:
return _sync_server.oneshot_async_run(calc_fig, args=args, kwargs=kwargs)
kwargs.setdefault("timeout", timeout)
sync_timeout = _resolve_timeout(timeout)
return _sync_server.oneshot_async_run(
calc_fig,
args=args,
kwargs=kwargs,
sync_timeout=sync_timeout,
)


def write_fig_sync(*args: Any, **kwargs: Any):
def write_fig_sync(
*args: Any,
timeout: float | None | Literal["auto"] = "auto",
**kwargs: Any,
):
"""Call `write_fig` but blocking."""
if _global_server.is_running():
if timeout != "auto":
warnings.warn(
"The timeout argument is ignored if using a server.",
UserWarning,
stacklevel=2,
)
return _global_server.call_function("write_fig", *args, **kwargs)
else:
return _sync_server.oneshot_async_run(write_fig, args=args, kwargs=kwargs)
kwargs.setdefault("timeout", timeout)
sync_timeout = _resolve_timeout(timeout)
return _sync_server.oneshot_async_run(
write_fig,
args=args,
kwargs=kwargs,
sync_timeout=sync_timeout,
)


def write_fig_from_object_sync(*args: Any, **kwargs: Any):
def write_fig_from_object_sync(
*args: Any,
timeout: float | None | Literal["auto"] = "auto",
**kwargs: Any,
):
"""Call `write_fig_from_object` but blocking."""
if _global_server.is_running():
if timeout != "auto":
warnings.warn(
"The timeout argument is ignored if using a server.",
UserWarning,
stacklevel=2,
)
return _global_server.call_function("write_fig_from_object", *args, **kwargs)
else:
kwargs.setdefault("timeout", timeout)
sync_timeout = _resolve_timeout(timeout)
return _sync_server.oneshot_async_run(
write_fig_from_object,
args=args,
kwargs=kwargs,
sync_timeout=sync_timeout,
)
91 changes: 89 additions & 2 deletions src/py/kaleido/_sync_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

import asyncio
import atexit
import os
import signal
import subprocess
import time
import warnings
from functools import partial
from queue import Queue
Expand Down Expand Up @@ -130,6 +134,8 @@ def oneshot_async_run(
func: Callable,
args: tuple[Any, ...],
kwargs: dict[str, Any],
*,
sync_timeout: float | None = None,
) -> Any:
"""
Run a thread to execute a single function.
Expand All @@ -153,9 +159,90 @@ def run(func, q, *args, **kwargs):
except BaseException as e: # noqa: BLE001
q.put(e)

t = Thread(target=run, args=(func, q, *args), kwargs=kwargs)
def _pid_exists(pid: int) -> bool:
try:
os.kill(pid, 0)
return True
except OSError:
return False

def _kill_child_chrome_processes() -> None:
try:
result = subprocess.run(
["ps", "-Ao", "pid=,ppid=,command="],
check=False,
capture_output=True,
text=True,
)
except OSError:
return

children: dict[int, list[int]] = {}
commands: dict[int, str] = {}

for line in result.stdout.splitlines():
if not line.strip():
continue
parts = line.strip().split(maxsplit=2)
if len(parts) < 2:
continue
try:
pid = int(parts[0])
ppid = int(parts[1])
except ValueError:
continue
command = parts[2] if len(parts) > 2 else ""
children.setdefault(ppid, []).append(pid)
commands[pid] = command

descendants: set[int] = set()
stack = [os.getpid()]
while stack:
current = stack.pop()
for child in children.get(current, []):
if child in descendants:
continue
descendants.add(child)
stack.append(child)

chrome_pids = [
pid
for pid in descendants
if "chrome" in commands.get(pid, "").lower()
or "chromium" in commands.get(pid, "").lower()
]

for pid in chrome_pids:
try:
os.kill(pid, signal.SIGTERM)
except OSError:
continue

if chrome_pids:
time.sleep(0.5)

for pid in chrome_pids:
if not _pid_exists(pid):
continue
try:
os.kill(pid, signal.SIGKILL)
except OSError:
continue

t = Thread(
target=run,
args=(func, q, *args),
kwargs=kwargs,
daemon=sync_timeout is not None,
)
t.start()
t.join()
t.join(timeout=sync_timeout)
if t.is_alive():
if sync_timeout is not None:
_kill_child_chrome_processes()
raise TimeoutError(
"Kaleido sync call exceeded the timeout; Chrome termination attempted.",
)
res = q.get()
if isinstance(res, BaseException):
raise res
Expand Down
42 changes: 37 additions & 5 deletions src/py/kaleido/kaleido.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@
from __future__ import annotations

import asyncio
import os
import warnings
from collections import deque
from collections.abc import AsyncIterable, Iterable
from pathlib import Path
from typing import TYPE_CHECKING, TypedDict, cast, overload
from typing import TYPE_CHECKING, Literal, TypedDict, cast, overload

import choreographer as choreo
import logistro
Expand Down Expand Up @@ -56,6 +57,35 @@ def _is_figuredict(obj: Any) -> TypeGuard[FigureDict]:

_logger = logistro.getLogger(__name__)

_TIMEOUT_ENV_VAR = "KALEIDO_RENDER_TIMEOUT"
_DEFAULT_TIMEOUT = 90.0
_AUTO_TIMEOUT = "auto"


def _resolve_timeout(timeout: float | None | Literal["auto"]) -> float | None:
if timeout != _AUTO_TIMEOUT:
return timeout

env_value = os.getenv(_TIMEOUT_ENV_VAR)
if env_value is None or env_value.strip() == "":
return _DEFAULT_TIMEOUT

normalized = env_value.strip().lower()
if normalized in {"none", "null", "off"}:
return None

try:
return float(normalized)
except ValueError:
warnings.warn(
f"Invalid {_TIMEOUT_ENV_VAR} value '{env_value}', "
f"falling back to default timeout of {_DEFAULT_TIMEOUT}s.",
RuntimeWarning,
stacklevel=2,
)
return _DEFAULT_TIMEOUT


# Show a warning if the installed Plotly version
# is incompatible with this version of Kaleido
_utils.warn_incompatible_plotly()
Expand Down Expand Up @@ -109,7 +139,7 @@ def __init__(
self,
# *args: Any, force named vars for all choreographer passthrough
n: int = 1,
timeout: float | None = 90,
timeout: float | None | Literal["auto"] = _AUTO_TIMEOUT,
page_generator: None | PageGenerator | str | Path = None,
plotlyjs: str | Path | None = None,
mathjax: str | Path | Literal[False] | None = None,
Expand All @@ -124,9 +154,11 @@ def __init__(
n (int, optional):
Number of processors to use (parallelization). Defaults to 1.

timeout (float | None, optional):
timeout (float | None | "auto", optional):
Number of seconds to wait to render any one image. None for no
timeout. Defaults to 90.
timeout. Defaults to "auto", which uses the
KALEIDO_RENDER_TIMEOUT environment variable when set, otherwise
falls back to 90 seconds.

page_generator (None | PageGenerator | str | Path, optional):
A PageGenerator object can be used for deep customization of the
Expand Down Expand Up @@ -168,7 +200,7 @@ def __init__(
)

page = page_generator
self._timeout = timeout
self._timeout = _resolve_timeout(timeout)
self._n = n
self._plotlyjs = plotlyjs
self._mathjax = mathjax
Expand Down
26 changes: 24 additions & 2 deletions src/py/tests/test_kaleido.py
Original file line number Diff line number Diff line change
Expand Up @@ -458,8 +458,30 @@ async def slow_calc_fig(*_args, **_kwargs):
await k.write_fig_from_object(fig_generator(), cancel_on_error=True)
pytest.fail("Should never reach this, should have raised.")

ret = await k.write_fig_from_object(fig_generator(), cancel_on_error=False)
assert isinstance(ret[0], (asyncio.TimeoutError, TimeoutError))
ret = await k.write_fig_from_object(fig_generator(), cancel_on_error=False)
assert isinstance(ret[0], (asyncio.TimeoutError, TimeoutError))


async def test_env_timeout_override(monkeypatch, simple_figure_with_bytes):
"""Test that env timeout is used when set to valid value."""
monkeypatch.setenv("KALEIDO_RENDER_TIMEOUT", "0.1")

fig = simple_figure_with_bytes["fig"]
opts = simple_figure_with_bytes["opts"]

async with Kaleido(timeout="auto") as k:

async def slow_calc_fig(*_args, **_kwargs):
await asyncio.sleep(1)
pytest.fail("Should have timed out before reaching here!")

for _ in range(k.tabs_ready.qsize()):
t = await k.tabs_ready.get()
t._calc_fig = slow_calc_fig # noqa: SLF001
await k.tabs_ready.put(t)

with pytest.raises((asyncio.TimeoutError, TimeoutError)): # noqa: PT012
await k.write_fig(fig, opts=opts, cancel_on_error=True)


@pytest.mark.parametrize(
Expand Down