from fastcore.test import test_eq
from fastcore.utils import patchConKernelClient source
Concurrent-safe Jupyter KernelClient
Imports
Setup
ConKernelClient
def ConKernelClient(
kwargs:t.Any
)->None:
A KernelClient with async APIs
get_[channel]_msg() methods wait for and return messages on channels, raising :exc:queue.Empty if no message arrives within timeout seconds.
ConKernelManager
def ConKernelManager(
args:Any, kwargs:Any
)->None:
An async kernel manager.
km = ConKernelManager(session=Session(key=b'x'))
await km.start_kernel()
await km.is_alive()True
kc = await km.client().start_channels()
await kc.is_alive()True
mid = kc.execute('1+1', reply=False)
mid'95164565-1b052c74632b03fbe217b8de_3713_1'
@patch
async def get_pubs(self:KernelClient, timeout=0.2):
"Retrieve all outstanding iopub messages"
res = []
try:
while msg := await self.get_iopub_msg(timeout=timeout): res.append(msg)
except Empty: pass
return respubs = await kc.get_pubs()
[(o['msg_type'],o['content']) for o in pubs][('status', {'execution_state': 'busy'}),
('execute_input', {'code': '1+1', 'execution_count': 1}),
('execute_result',
{'data': {'text/plain': '2'}, 'metadata': {}, 'execution_count': 1}),
('status', {'execution_state': 'idle'})]
pubs[0]['parent_header']{'msg_id': '95164565-1b052c74632b03fbe217b8de_3713_1',
'msg_type': 'execute_request',
'username': 'jhoward',
'session': '95164565-1b052c74632b03fbe217b8de',
'date': datetime.datetime(2026, 2, 27, 3, 16, 39, 228656, tzinfo=tzutc()),
'version': '5.4'}
kc.stop_channels()kc = await km.client().start_channels()r = await kc.execute('2+1', timeout=1, reply=True)
r{'header': {'msg_id': 'd40943ee-1c9991f3726c7b2c58e4e42c_3719_21',
'msg_type': 'execute_reply',
'username': 'jhoward',
'session': 'd40943ee-1c9991f3726c7b2c58e4e42c',
'date': datetime.datetime(2026, 2, 27, 3, 16, 40, 156085, tzinfo=tzutc()),
'version': '5.4'},
'msg_id': 'd40943ee-1c9991f3726c7b2c58e4e42c_3719_21',
'msg_type': 'execute_reply',
'parent_header': {'msg_id': '95164565-1b052c74632b03fbe217b8de_3713_1',
'msg_type': 'execute_request',
'username': 'jhoward',
'session': '95164565-1b052c74632b03fbe217b8de',
'date': datetime.datetime(2026, 2, 27, 3, 16, 40, 151952, tzinfo=tzutc()),
'version': '5.4'},
'metadata': {'started': '2026-02-27T03:16:40.153095Z',
'dependencies_met': True,
'engine': '45b26be5-508d-4fa0-8397-a8e45704f6da',
'status': 'ok'},
'content': {'status': 'ok',
'execution_count': 2,
'user_expressions': {},
'payload': []},
'buffers': []}
await kc.get_pubs()
kc.execute('print("orphan")')
await asyncio.sleep(0.3)
slow, fast = await asyncio.gather(
kc.execute('import time; time.sleep(0.3)', timeout=5, reply=True),
kc.execute('1+1', timeout=5, reply=True),
return_exceptions=True)
test_eq(type(slow), dict)
test_eq(type(fast), dict)a = kc.execute('x=2', reply=True)
b = kc.execute('y=3', reply=True)r = await asyncio.wait_for(asyncio.gather(a,b), timeout=2)
test_eq(len(r), 2)
r[0]['parent_header']['msg_id']'95164565-1b052c74632b03fbe217b8de_3713_5'
if await km.is_alive():
kc.stop_channels()
await km.shutdown_kernel()