from fastcore.test import test_eq
from fastcore.utils import patchConKernelClient source
Concurrent-safe Jupyter KernelClient
Imports
Setup
ConKernelClient
def ConKernelClient(
kwargs:t.Any
)->None:
A KernelClient with async APIs
get_[channel]_msg() methods wait for and return messages on channels, raising :exc:queue.Empty if no message arrives within timeout seconds.
ConKernelManager
def ConKernelManager(
args:Any, kwargs:Any
)->None:
An async kernel manager.
km = ConKernelManager(session=Session(key=b'x'))
await km.start_kernel()
await km.is_alive()True
kc = await km.client().start_channels()
await kc.is_alive()True
Orphan reply for 6f154b67-267661d16cf3ad11f27d848c_93430_1, pending=[]
mid = kc.execute('1+1', reply=False)
mid'6f154b67-267661d16cf3ad11f27d848c_93430_1'
@patch
async def get_pubs(self:KernelClient, timeout=0.2):
"Retrieve all outstanding iopub messages"
res = []
try:
while msg := await self.get_iopub_msg(timeout=timeout): res.append(msg)
except Empty: pass
return respubs = await kc.get_pubs()
[(o['msg_type'],o['content']) for o in pubs][('status', {'execution_state': 'busy'}),
('execute_input', {'code': '1+1', 'execution_count': 1}),
('execute_result',
{'data': {'text/plain': '2'}, 'metadata': {}, 'execution_count': 1}),
('status', {'execution_state': 'idle'})]
pubs[0]['parent_header']{'msg_id': '6f154b67-267661d16cf3ad11f27d848c_93430_1',
'msg_type': 'execute_request',
'username': 'jhoward',
'session': '6f154b67-267661d16cf3ad11f27d848c',
'date': datetime.datetime(2026, 3, 14, 21, 35, 2, 728622, tzinfo=tzutc()),
'version': '5.4'}
kc.stop_channels()kc = await km.client().start_channels()Orphan reply for 6f154b67-267661d16cf3ad11f27d848c_93430_2, pending=[]
r = await kc.execute('2+1', timeout=1, reply=True)
r{'header': {'msg_id': 'b9a8b1e6-23d7634575e20bd3ebf8459a_93439_21',
'msg_type': 'execute_reply',
'username': 'jhoward',
'session': 'b9a8b1e6-23d7634575e20bd3ebf8459a',
'date': datetime.datetime(2026, 3, 14, 21, 35, 3, 696363, tzinfo=tzutc()),
'version': '5.4'},
'msg_id': 'b9a8b1e6-23d7634575e20bd3ebf8459a_93439_21',
'msg_type': 'execute_reply',
'parent_header': {'msg_id': '6f154b67-267661d16cf3ad11f27d848c_93430_1',
'msg_type': 'execute_request',
'username': 'jhoward',
'session': '6f154b67-267661d16cf3ad11f27d848c',
'date': datetime.datetime(2026, 3, 14, 21, 35, 3, 690150, tzinfo=tzutc()),
'version': '5.4'},
'metadata': {'started': '2026-03-14T21:35:03.691788Z',
'dependencies_met': True,
'engine': '49ae4c00-24a1-4fe3-a807-05d45755418f',
'status': 'ok'},
'content': {'status': 'ok',
'execution_count': 2,
'user_expressions': {},
'payload': []},
'buffers': []}
await kc.get_pubs()
kc.execute('print("orphan")')
await asyncio.sleep(0.3)
slow, fast = await asyncio.gather(
kc.execute('import time; time.sleep(0.3)', timeout=5, reply=True),
kc.execute('1+1', timeout=5, reply=True),
return_exceptions=True)
test_eq(type(slow), dict)
test_eq(type(fast), dict)a = kc.execute('x=2', reply=True)
b = kc.execute('y=3', reply=True)
r = await asyncio.wait_for(asyncio.gather(a,b), timeout=2)
test_eq(len(r), 2)
r[0]['parent_header']['msg_id']'6f154b67-267661d16cf3ad11f27d848c_93430_5'
async def g():
for i in range(10): await kc.execute(f'a{i}={i}; a{i}', reply=True)
r = await asyncio.wait_for(asyncio.gather(g(),g(),g(),g()), timeout=10)if await km.is_alive():
kc.stop_channels()
await km.shutdown_kernel()