Skip to content

Commit 6aafc01

Browse files
committed
fixes #5
1 parent f535dbe commit 6aafc01

3 files changed

Lines changed: 132 additions & 48 deletions

File tree

conkernelclient/_modidx.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,10 @@
88
'syms': { 'conkernelclient.core': { 'conkernelclient.core.ConKernelClient': ('core.html#conkernelclient', 'conkernelclient/core.py'),
99
'conkernelclient.core.ConKernelClient._async_recv_reply': ( 'core.html#conkernelclient._async_recv_reply',
1010
'conkernelclient/core.py'),
11+
'conkernelclient.core.ConKernelClient._check_alive': ( 'core.html#conkernelclient._check_alive',
12+
'conkernelclient/core.py'),
13+
'conkernelclient.core.ConKernelClient._fail_pending': ( 'core.html#conkernelclient._fail_pending',
14+
'conkernelclient/core.py'),
1115
'conkernelclient.core.ConKernelClient.execute': ( 'core.html#conkernelclient.execute',
1216
'conkernelclient/core.py'),
1317
'conkernelclient.core.ConKernelClient.start_channels': ( 'core.html#conkernelclient.start_channels',

conkernelclient/core.py

Lines changed: 37 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,10 @@
1515
from jupyter_client.kernelspec import KernelSpec
1616
from jupyter_client import AsyncKernelManager
1717
from traitlets import Type
18-
import asyncio, zmq.asyncio, time
18+
import asyncio, zmq.asyncio, time, logging
19+
20+
# %% ../nbs/00_core.ipynb #737a0fc1
21+
_log = logging.getLogger(__name__)
1922

2023
# %% ../nbs/00_core.ipynb #374b75d0
2124
if not hasattr(Session, '_orig_send'): Session._orig_send = Session.send
@@ -36,6 +39,17 @@ def _send(self, stream, msg_or_type, content=None, parent=None, ident=None,
3639

3740
# %% ../nbs/00_core.ipynb #d6a5fa6a
3841
class ConKernelClient(AsyncKernelClient):
42+
def _fail_pending(self, exc:Exception, skip=None):
43+
for k,(q,_) in list(getattr(self, '_pending', {}).items()):
44+
if k != skip:
45+
try: q.put_nowait(exc)
46+
except asyncio.QueueFull: pass
47+
48+
def _check_alive(self):
49+
if not self.channels_running: raise RuntimeError("Channels not running")
50+
tk = getattr(self, '_shell_reader_task', None)
51+
return tk is not None and not tk.done()
52+
3953
async def start_channels(self, shell:bool=True, iopub:bool=True, stdin:bool=True, hb:bool=True, control:bool=True):
4054
"Start channels, wait for ready, and launch background shell-reply reader"
4155
super().start_channels(shell=shell, iopub=iopub, stdin=stdin, hb=hb, control=control)
@@ -47,19 +61,28 @@ async def _reader():
4761
while True:
4862
try: reply = await self.get_shell_msg(timeout=None)
4963
except Exception as e:
50-
for q in self._pending.values(): await q.put(e)
51-
if self._pending: logging.warning(f"_reader died with pending - {self._pending}: {e}")
52-
else: logging.warning(f"_reader died with no pending: {e}")
64+
self._fail_pending(e)
65+
_log.warning(f"_reader died, pending={list(self._pending)}: {e}")
5366
break
54-
q = self._pending.get(reply["parent_header"].get("msg_id"))
55-
if q: await q.put(reply)
67+
mid = reply["parent_header"].get("msg_id")
68+
pend = self._pending.get(mid)
69+
if pend:
70+
q, soe = pend
71+
try: q.put_nowait(reply)
72+
except asyncio.QueueFull: pass
73+
else: _log.warning(f"Orphan reply for {reply['parent_header'].get('msg_id')}, pending={list(self._pending)}")
74+
cts = reply.get("content", {})
75+
if cts.get("status") in ("error", "aborted") and pend and soe:
76+
exc = RuntimeError(f"Kernel error aborted: {cts.get('ename')}: {cts.get('evalue')}")
77+
self._fail_pending(exc, skip=mid)
5678
self._shell_reader_task = asyncio.create_task(_reader())
5779
await _ready.wait()
5880
await asyncio.sleep(0.2)
5981
return self
6082

6183
def stop_channels(self):
6284
"Stop channels and cancel the background shell-reply reader task"
85+
self._fail_pending(RuntimeError("Shell channels stopped before reply"))
6386
super().stop_channels()
6487
if (tk := getattr(self, '_shell_reader_task', None)):
6588
tk.cancel()
@@ -68,26 +91,29 @@ def stop_channels(self):
6891

6992
async def _async_recv_reply(self, msg_id, timeout=None, channel="shell"):
7093
if channel == "control": return await self._async_get_control_msg(timeout=timeout)
71-
q = self._pending[msg_id]
94+
q, _ = self._pending[msg_id]
7295
try:
7396
res = await asyncio.wait_for(q.get(), timeout)
7497
if isinstance(res, Exception): raise res
7598
return res
76-
except asyncio.TimeoutError as e: raise TimeoutError("Timeout waiting for reply") from e
99+
except (asyncio.TimeoutError, asyncio.CancelledError) as e:
100+
_log.warning(f"Timeout for {msg_id}, pending={list(self._pending)}")
101+
raise TimeoutError("Timeout waiting for reply") from e
77102
finally: self._pending.pop(msg_id, None)
78103

79104
def execute(self, code, user_expressions=None, allow_stdin=None, reply=False, subsh_id=None,
80-
cts_typ='code', timeout=60, msg_id=None, **kw):
105+
cts_typ='code', timeout=60, msg_id=None, stop_on_error=True, **kw):
81106
"Send an execute request, returning a coroutine for the reply if `reply`, else the msg_id"
107+
if not self._check_alive(): return asyncio.sleep(0) if reply else None
82108
if user_expressions is None: user_expressions = {}
83109
if allow_stdin is None: allow_stdin = self.allow_stdin
84-
content = dict(user_expressions=user_expressions, allow_stdin=allow_stdin, subsh_id=subsh_id, **kw)
110+
content = dict(user_expressions=user_expressions, allow_stdin=allow_stdin, subsh_id=subsh_id, stop_on_error=stop_on_error, **kw)
85111
content[cts_typ] = code
86112
msg = self.session.msg("execute_request", content)
87113
if msg_id is not None: msg["header"]["msg_id"] = msg_id
88114
if subsh_id is not None: msg["header"]["subshell_id"] = subsh_id
89115
msg_id = msg["header"]["msg_id"]
90-
if reply: self._pending[msg_id] = asyncio.Queue(maxsize=1)
116+
if reply: self._pending[msg_id] = (asyncio.Queue(maxsize=1), stop_on_error)
91117
self.shell_channel.send(msg)
92118
if not reply: return msg_id
93119
return self._async_recv_reply(msg_id, timeout=timeout)

nbs/00_core.ipynb

Lines changed: 91 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@
4545
"from jupyter_client.kernelspec import KernelSpec\n",
4646
"from jupyter_client import AsyncKernelManager\n",
4747
"from traitlets import Type\n",
48-
"import asyncio, zmq.asyncio, time"
48+
"import asyncio, zmq.asyncio, time, logging"
4949
]
5050
},
5151
{
@@ -59,6 +59,17 @@
5959
"from fastcore.utils import patch"
6060
]
6161
},
62+
{
63+
"cell_type": "code",
64+
"execution_count": null,
65+
"id": "737a0fc1",
66+
"metadata": {},
67+
"outputs": [],
68+
"source": [
69+
"#| export\n",
70+
"_log = logging.getLogger(__name__)"
71+
]
72+
},
6273
{
6374
"cell_type": "markdown",
6475
"id": "665c28bb",
@@ -101,6 +112,17 @@
101112
"source": [
102113
"#| export\n",
103114
"class ConKernelClient(AsyncKernelClient):\n",
115+
" def _fail_pending(self, exc:Exception, skip=None):\n",
116+
" for k,(q,_) in list(getattr(self, '_pending', {}).items()):\n",
117+
" if k != skip:\n",
118+
" try: q.put_nowait(exc)\n",
119+
" except asyncio.QueueFull: pass\n",
120+
"\n",
121+
" def _check_alive(self):\n",
122+
" if not self.channels_running: raise RuntimeError(\"Channels not running\")\n",
123+
" tk = getattr(self, '_shell_reader_task', None)\n",
124+
" return tk is not None and not tk.done()\n",
125+
"\n",
104126
" async def start_channels(self, shell:bool=True, iopub:bool=True, stdin:bool=True, hb:bool=True, control:bool=True):\n",
105127
" \"Start channels, wait for ready, and launch background shell-reply reader\"\n",
106128
" super().start_channels(shell=shell, iopub=iopub, stdin=stdin, hb=hb, control=control)\n",
@@ -112,19 +134,28 @@
112134
" while True:\n",
113135
" try: reply = await self.get_shell_msg(timeout=None)\n",
114136
" except Exception as e:\n",
115-
" for q in self._pending.values(): await q.put(e)\n",
116-
" if self._pending: logging.warning(f\"_reader died with pending - {self._pending}: {e}\")\n",
117-
" else: logging.warning(f\"_reader died with no pending: {e}\")\n",
137+
" self._fail_pending(e)\n",
138+
" _log.warning(f\"_reader died, pending={list(self._pending)}: {e}\")\n",
118139
" break\n",
119-
" q = self._pending.get(reply[\"parent_header\"].get(\"msg_id\"))\n",
120-
" if q: await q.put(reply)\n",
140+
" mid = reply[\"parent_header\"].get(\"msg_id\")\n",
141+
" pend = self._pending.get(mid)\n",
142+
" if pend:\n",
143+
" q, soe = pend\n",
144+
" try: q.put_nowait(reply)\n",
145+
" except asyncio.QueueFull: pass\n",
146+
" else: _log.warning(f\"Orphan reply for {reply['parent_header'].get('msg_id')}, pending={list(self._pending)}\")\n",
147+
" cts = reply.get(\"content\", {})\n",
148+
" if cts.get(\"status\") in (\"error\", \"aborted\") and pend and soe:\n",
149+
" exc = RuntimeError(f\"Kernel error aborted: {cts.get('ename')}: {cts.get('evalue')}\")\n",
150+
" self._fail_pending(exc, skip=mid)\n",
121151
" self._shell_reader_task = asyncio.create_task(_reader())\n",
122152
" await _ready.wait()\n",
123153
" await asyncio.sleep(0.2)\n",
124154
" return self\n",
125155
"\n",
126156
" def stop_channels(self):\n",
127157
" \"Stop channels and cancel the background shell-reply reader task\"\n",
158+
" self._fail_pending(RuntimeError(\"Shell channels stopped before reply\"))\n",
128159
" super().stop_channels()\n",
129160
" if (tk := getattr(self, '_shell_reader_task', None)):\n",
130161
" tk.cancel()\n",
@@ -133,26 +164,29 @@
133164
"\n",
134165
" async def _async_recv_reply(self, msg_id, timeout=None, channel=\"shell\"):\n",
135166
" if channel == \"control\": return await self._async_get_control_msg(timeout=timeout)\n",
136-
" q = self._pending[msg_id]\n",
167+
" q, _ = self._pending[msg_id]\n",
137168
" try:\n",
138169
" res = await asyncio.wait_for(q.get(), timeout)\n",
139170
" if isinstance(res, Exception): raise res\n",
140171
" return res\n",
141-
" except asyncio.TimeoutError as e: raise TimeoutError(\"Timeout waiting for reply\") from e\n",
172+
" except (asyncio.TimeoutError, asyncio.CancelledError) as e:\n",
173+
" _log.warning(f\"Timeout for {msg_id}, pending={list(self._pending)}\")\n",
174+
" raise TimeoutError(\"Timeout waiting for reply\") from e\n",
142175
" finally: self._pending.pop(msg_id, None)\n",
143176
"\n",
144177
" def execute(self, code, user_expressions=None, allow_stdin=None, reply=False, subsh_id=None,\n",
145-
" cts_typ='code', timeout=60, msg_id=None, **kw):\n",
178+
" cts_typ='code', timeout=60, msg_id=None, stop_on_error=True, **kw):\n",
146179
" \"Send an execute request, returning a coroutine for the reply if `reply`, else the msg_id\"\n",
180+
" if not self._check_alive(): return asyncio.sleep(0) if reply else None\n",
147181
" if user_expressions is None: user_expressions = {}\n",
148182
" if allow_stdin is None: allow_stdin = self.allow_stdin\n",
149-
" content = dict(user_expressions=user_expressions, allow_stdin=allow_stdin, subsh_id=subsh_id, **kw)\n",
183+
" content = dict(user_expressions=user_expressions, allow_stdin=allow_stdin, subsh_id=subsh_id, stop_on_error=stop_on_error, **kw)\n",
150184
" content[cts_typ] = code\n",
151185
" msg = self.session.msg(\"execute_request\", content)\n",
152186
" if msg_id is not None: msg[\"header\"][\"msg_id\"] = msg_id\n",
153187
" if subsh_id is not None: msg[\"header\"][\"subshell_id\"] = subsh_id\n",
154188
" msg_id = msg[\"header\"][\"msg_id\"]\n",
155-
" if reply: self._pending[msg_id] = asyncio.Queue(maxsize=1)\n",
189+
" if reply: self._pending[msg_id] = (asyncio.Queue(maxsize=1), stop_on_error)\n",
156190
" self.shell_channel.send(msg)\n",
157191
" if not reply: return msg_id\n",
158192
" return self._async_recv_reply(msg_id, timeout=timeout)\n",
@@ -212,6 +246,13 @@
212246
"execution_count": null,
213247
"metadata": {},
214248
"output_type": "execute_result"
249+
},
250+
{
251+
"name": "stderr",
252+
"output_type": "stream",
253+
"text": [
254+
"Orphan reply for 30c03d32-82cd57ee2bae596f194f9300_34664_1, pending=[]\n"
255+
]
215256
}
216257
],
217258
"source": [
@@ -228,7 +269,7 @@
228269
{
229270
"data": {
230271
"text/plain": [
231-
"'95164565-1b052c74632b03fbe217b8de_3713_1'"
272+
"'30c03d32-82cd57ee2bae596f194f9300_34664_1'"
232273
]
233274
},
234275
"execution_count": null,
@@ -293,11 +334,11 @@
293334
{
294335
"data": {
295336
"text/plain": [
296-
"{'msg_id': '95164565-1b052c74632b03fbe217b8de_3713_1',\n",
337+
"{'msg_id': '30c03d32-82cd57ee2bae596f194f9300_34664_1',\n",
297338
" 'msg_type': 'execute_request',\n",
298339
" 'username': 'jhoward',\n",
299-
" 'session': '95164565-1b052c74632b03fbe217b8de',\n",
300-
" 'date': datetime.datetime(2026, 2, 27, 3, 16, 39, 228656, tzinfo=tzutc()),\n",
340+
" 'session': '30c03d32-82cd57ee2bae596f194f9300',\n",
341+
" 'date': datetime.datetime(2026, 3, 5, 6, 20, 32, 255676, tzinfo=tzutc()),\n",
301342
" 'version': '5.4'}"
302343
]
303344
},
@@ -325,7 +366,15 @@
325366
"execution_count": null,
326367
"id": "afb4f539",
327368
"metadata": {},
328-
"outputs": [],
369+
"outputs": [
370+
{
371+
"name": "stderr",
372+
"output_type": "stream",
373+
"text": [
374+
"Orphan reply for 30c03d32-82cd57ee2bae596f194f9300_34664_2, pending=[]\n"
375+
]
376+
}
377+
],
329378
"source": [
330379
"kc = await km.client().start_channels()"
331380
]
@@ -339,23 +388,23 @@
339388
{
340389
"data": {
341390
"text/plain": [
342-
"{'header': {'msg_id': 'd40943ee-1c9991f3726c7b2c58e4e42c_3719_21',\n",
391+
"{'header': {'msg_id': '3560f722-f0607ae043c870acc8743b8c_34684_21',\n",
343392
" 'msg_type': 'execute_reply',\n",
344393
" 'username': 'jhoward',\n",
345-
" 'session': 'd40943ee-1c9991f3726c7b2c58e4e42c',\n",
346-
" 'date': datetime.datetime(2026, 2, 27, 3, 16, 40, 156085, tzinfo=tzutc()),\n",
394+
" 'session': '3560f722-f0607ae043c870acc8743b8c',\n",
395+
" 'date': datetime.datetime(2026, 3, 5, 6, 20, 33, 234758, tzinfo=tzutc()),\n",
347396
" 'version': '5.4'},\n",
348-
" 'msg_id': 'd40943ee-1c9991f3726c7b2c58e4e42c_3719_21',\n",
397+
" 'msg_id': '3560f722-f0607ae043c870acc8743b8c_34684_21',\n",
349398
" 'msg_type': 'execute_reply',\n",
350-
" 'parent_header': {'msg_id': '95164565-1b052c74632b03fbe217b8de_3713_1',\n",
399+
" 'parent_header': {'msg_id': '30c03d32-82cd57ee2bae596f194f9300_34664_1',\n",
351400
" 'msg_type': 'execute_request',\n",
352401
" 'username': 'jhoward',\n",
353-
" 'session': '95164565-1b052c74632b03fbe217b8de',\n",
354-
" 'date': datetime.datetime(2026, 2, 27, 3, 16, 40, 151952, tzinfo=tzutc()),\n",
402+
" 'session': '30c03d32-82cd57ee2bae596f194f9300',\n",
403+
" 'date': datetime.datetime(2026, 3, 5, 6, 20, 33, 227523, tzinfo=tzutc()),\n",
355404
" 'version': '5.4'},\n",
356-
" 'metadata': {'started': '2026-02-27T03:16:40.153095Z',\n",
405+
" 'metadata': {'started': '2026-03-05T06:20:33.230196Z',\n",
357406
" 'dependencies_met': True,\n",
358-
" 'engine': '45b26be5-508d-4fa0-8397-a8e45704f6da',\n",
407+
" 'engine': 'fce6c486-de7f-4012-87a9-ef36d7d2ec76',\n",
359408
" 'status': 'ok'},\n",
360409
" 'content': {'status': 'ok',\n",
361410
" 'execution_count': 2,\n",
@@ -397,22 +446,11 @@
397446
"execution_count": null,
398447
"id": "0ccc7cca",
399448
"metadata": {},
400-
"outputs": [],
401-
"source": [
402-
"a = kc.execute('x=2', reply=True)\n",
403-
"b = kc.execute('y=3', reply=True)"
404-
]
405-
},
406-
{
407-
"cell_type": "code",
408-
"execution_count": null,
409-
"id": "2739f44c",
410-
"metadata": {},
411449
"outputs": [
412450
{
413451
"data": {
414452
"text/plain": [
415-
"'95164565-1b052c74632b03fbe217b8de_3713_5'"
453+
"'30c03d32-82cd57ee2bae596f194f9300_34664_5'"
416454
]
417455
},
418456
"execution_count": null,
@@ -421,11 +459,27 @@
421459
}
422460
],
423461
"source": [
462+
"a = kc.execute('x=2', reply=True)\n",
463+
"b = kc.execute('y=3', reply=True)\n",
464+
"\n",
424465
"r = await asyncio.wait_for(asyncio.gather(a,b), timeout=2)\n",
425466
"test_eq(len(r), 2)\n",
426467
"r[0]['parent_header']['msg_id']"
427468
]
428469
},
470+
{
471+
"cell_type": "code",
472+
"execution_count": null,
473+
"id": "27a2a287",
474+
"metadata": {},
475+
"outputs": [],
476+
"source": [
477+
"async def g():\n",
478+
" for i in range(10): await kc.execute(f'a{i}={i}; a{i}', reply=True)\n",
479+
"\n",
480+
"r = await asyncio.wait_for(asyncio.gather(g(),g(),g(),g()), timeout=10)"
481+
]
482+
},
429483
{
430484
"cell_type": "code",
431485
"execution_count": null,

0 commit comments

Comments
 (0)