Skip to content

Commit 017d785

Browse files
authored
Merge pull request #280 from plotly/andrew/use_process_group
Andrew/use process group
2 parents 4e6a8f7 + 2f9373e commit 017d785

15 files changed

Lines changed: 121 additions & 42 deletions

.pre-commit-config.yaml

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -41,12 +41,10 @@ repos:
4141
entry: yamllint
4242
language: python
4343
types: [file, yaml]
44-
args: ['-d', "{\
45-
extends: default,\
46-
rules: {\
47-
colons: { max-spaces-after: -1 }\
48-
}\
49-
}"]
44+
args: [
45+
'-d',
46+
"{ extends: default, rules: { colons: { max-spaces-after: -1 } } }",
47+
]
5048
- repo: https://github.com/rhysd/actionlint
5149
rev: v1.7.8
5250
hooks:

CHANGELOG.txt

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
1-
- Add argument to Session/Target send_command with_perf to return
2-
timing information about browser write/read.
1+
v1.3.0
2+
- Change to process group for better killing of multi-process chrome
3+
- Add argument to Session/Target `send_command(..., *, with_perf: bool)` to
4+
return timing information about browser write/read.
35
- Update default chrome from 135.0.7011.0/1418433 to 144.0.7527.0/1544685
46
- Fix: New chrome takes longer/doesn't populate targets right away, so add a
57
retry loop to populate targets

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ dev = [
5555
"pytest-asyncio; python_version < '3.14'",
5656
"pytest-asyncio>=1.2.0; python_version >= '3.14'",
5757
"pytest-xdist",
58+
"typing-extensions>=4.13.2",
5859
]
5960

6061
# uv doens't allow dependency groups to have separate python requirements

src/choreographer/_brokers/_async.py

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
from __future__ import annotations
22

33
import asyncio
4-
import time
54
import warnings
65
from functools import partial
76
from typing import TYPE_CHECKING
@@ -147,7 +146,7 @@ def check_read_loop_error(result: asyncio.Future[Any]) -> None:
147146
async def read_loop() -> None: # noqa: PLR0912, PLR0915, C901
148147
loop = asyncio.get_running_loop()
149148
fn = partial(self._channel.read_jsons, blocking=True)
150-
responses = await loop.run_in_executor(
149+
responses, perf = await loop.run_in_executor(
151150
executor=self._executor,
152151
func=fn,
153152
)
@@ -222,6 +221,7 @@ async def read_loop() -> None: # noqa: PLR0912, PLR0915, C901
222221
event_session.unsubscribe(query)
223222

224223
elif key:
224+
self.read_perfs[key] = perf
225225
_logger.debug(f"Have a response with key {key}")
226226
if key in self.futures:
227227
_logger.debug(f"Found future for key {key}")
@@ -232,7 +232,6 @@ async def read_loop() -> None: # noqa: PLR0912, PLR0915, C901
232232
raise RuntimeError(f"Couldn't find a future for key: {key}")
233233
if not future.done():
234234
future.set_result(response)
235-
self.read_perfs[key] = time.perf_counter()
236235
if len(self.write_perfs) > PERFS_MAX:
237236
self.write_perfs = dict(
238237
list(self.write_perfs.items())[TRIM_SIZE:],
@@ -281,15 +280,14 @@ async def write_json(
281280
self.futures[key] = future
282281
_logger.debug(f"Created future: {key} {future}")
283282
try:
284-
perf_start = time.perf_counter()
285283
async with self._write_lock: # this should be a queue not a lock
286284
loop = asyncio.get_running_loop()
287-
await loop.run_in_executor(
285+
perf = await loop.run_in_executor(
288286
self._executor,
289287
self._channel.write_json,
290288
obj,
291289
)
292-
self.write_perfs[key] = (perf_start, time.perf_counter())
290+
self.write_perfs[key] = perf
293291
except (_manual_thread_pool.ExecutorClosedError, asyncio.CancelledError) as e:
294292
if not future.cancel() or not future.cancelled():
295293
await future # it wasn't canceled, so listen to it before raising

src/choreographer/browser_async.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
"""Provides the async api: `Browser`, `Tab`."""
22

3+
# hello, thank you for visiting
4+
35
from __future__ import annotations
46

57
import asyncio
@@ -30,7 +32,8 @@
3032
from .browsers._interface_type import BrowserImplInterface
3133
from .channels._interface_type import ChannelInterface
3234

33-
_N = MAX_POPULATE_LOOPS = 20
35+
36+
MAX_POPULATE_LOOPS = 40 if "CI" in os.environ else 20
3437

3538

3639
_logger = logistro.getLogger(__name__)
@@ -172,7 +175,7 @@ def run() -> subprocess.Popen[bytes] | subprocess.Popen[str]: # depends on args
172175
await self.populate_targets()
173176
await asyncio.sleep(0.1)
174177
counter += 1
175-
if counter == MAX_POPULATE_LOOPS:
178+
if counter >= MAX_POPULATE_LOOPS:
176179
break
177180
except (BrowserClosedError, BrowserFailedError, asyncio.CancelledError) as e:
178181
raise BrowserFailedError(

src/choreographer/browsers/chromium.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,8 @@ def get_popen_args(self) -> Mapping[str, Any]:
204204
if isinstance(self._channel, Pipe):
205205
args["stdin"] = self._channel.from_choreo_to_external
206206
args["stdout"] = self._channel.from_external_to_choreo
207+
args["start_new_session"] = True
208+
207209
_logger.debug(f"Returning args: {args}")
208210
return args
209211

src/choreographer/channels/_interface_type.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ class ChannelInterface(Protocol):
1414
"""Defines the basic interface of a channel."""
1515

1616
# Not sure I like the obj type
17-
def write_json(self, obj: Mapping[str, Any]) -> None:
17+
def write_json(self, obj: Mapping[str, Any]) -> tuple[float, float]:
1818
...
1919
# """
2020
# Accept an object and send it doesnt the channel serialized.
@@ -24,7 +24,11 @@ def write_json(self, obj: Mapping[str, Any]) -> None:
2424
#
2525
# """
2626

27-
def read_jsons(self, *, blocking: bool = True) -> Sequence[BrowserResponse]:
27+
def read_jsons(
28+
self,
29+
*,
30+
blocking: bool = True,
31+
) -> tuple[Sequence[BrowserResponse], float]:
2832
...
2933
# """
3034
# Read all available jsons in the channel and returns a list of complete ones.

src/choreographer/channels/pipe.py

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import os
66
import platform
77
import sys
8+
import time
89
import warnings
910
from threading import Lock
1011
from typing import TYPE_CHECKING
@@ -78,7 +79,7 @@ def open(self) -> None:
7879
if not self._open_lock.acquire(blocking=False):
7980
raise RuntimeError("Cannot open same pipe twice.")
8081

81-
def write_json(self, obj: Mapping[str, Any]) -> None:
82+
def write_json(self, obj: Mapping[str, Any]) -> tuple[float, float]:
8283
"""
8384
Send one json down the pipe.
8485
@@ -97,6 +98,7 @@ def write_json(self, obj: Mapping[str, Any]) -> None:
9798
f"size: {len(encoded_message)}.",
9899
)
99100
_logger.debug2(f"Full Message: {encoded_message!r}")
101+
start = time.perf_counter()
100102
try:
101103
ret = os.write(self._write_to_browser, encoded_message)
102104
_logger.debug(
@@ -109,12 +111,13 @@ def write_json(self, obj: Mapping[str, Any]) -> None:
109111
except OSError as e:
110112
self.close()
111113
raise ChannelClosedError from e
114+
return (start, time.perf_counter())
112115

113116
def read_jsons( # noqa: PLR0912, PLR0915, C901 branches, complexity
114117
self,
115118
*,
116119
blocking: bool = True,
117-
) -> Sequence[BrowserResponse]:
120+
) -> tuple[Sequence[BrowserResponse], float]:
118121
"""
119122
Read from the pipe and return one or more jsons in a list.
120123
@@ -168,7 +171,7 @@ def read_jsons( # noqa: PLR0912, PLR0915, C901 branches, complexity
168171
raw_buffer += os.read(self._read_from_browser, 10000)
169172
except BlockingIOError:
170173
_logger.debug("BlockingIOError")
171-
return jsons
174+
return jsons, time.perf_counter()
172175
except OSError as e:
173176
_logger.debug("OSError")
174177
self.close()
@@ -182,7 +185,7 @@ def read_jsons( # noqa: PLR0912, PLR0915, C901 branches, complexity
182185
)
183186
_logger.debug2(f"Whole buffer: {raw_buffer!r}")
184187
if raw_buffer is None:
185-
return jsons
188+
return jsons, time.perf_counter()
186189
decoded_buffer = raw_buffer.decode("utf-8")
187190
raw_messages = decoded_buffer.split("\0")
188191
_logger.debug(f"Received {len(raw_messages)} raw_messages.")
@@ -195,7 +198,7 @@ def read_jsons( # noqa: PLR0912, PLR0915, C901 branches, complexity
195198
except:
196199
_logger.exception("Error in trying to decode JSON off our read.")
197200
raise
198-
return jsons
201+
return jsons, time.perf_counter()
199202

200203
def _unblock_fd(self, fd: int) -> None:
201204
try:

src/choreographer/cli/_cli_utils_no_qa.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ def diagnose() -> None:
7373
b._browser_impl.pre_open()
7474
cli = b._browser_impl.get_cli()
7575
env = b._browser_impl.get_env() # noqa: F841
76-
args = b._browser_impl.get_popen_args()
76+
pargs = b._browser_impl.get_popen_args()
7777
b._browser_impl.clean()
7878
del b
7979
print("*** cli:")
@@ -86,7 +86,7 @@ def diagnose() -> None:
8686
# print(" " * 8 + f"{k}:{v}")
8787

8888
print("*** Popen args:")
89-
for k, v in args.items():
89+
for k, v in pargs.items():
9090
print(" " * 8 + f"{k}:{v}")
9191
print("*".center(50, "*"))
9292
print("VERSION INFO:".center(50, "*"))

src/choreographer/protocol/devtools_async.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -274,7 +274,12 @@ async def send_command(
274274
if not self.sessions.values():
275275
raise RuntimeError("Cannot send_command without at least one valid session")
276276
session = self.get_session()
277-
return await session.send_command(command, params, with_perf=with_perf)
277+
# so mypy can't handle bool = Literal[True, False]
278+
# so this is suboptimal but it quiets typer
279+
if with_perf:
280+
return await session.send_command(command, params, with_perf=True)
281+
else:
282+
return await session.send_command(command, params, with_perf=False)
278283

279284
async def create_session(self) -> Session:
280285
"""Create a new session on this target."""

0 commit comments

Comments
 (0)