ConKernelClient source

Concurrent-safe Jupyter KernelClient

Imports

from fastcore.test import test_eq
from fastcore.utils import patch

Setup


source

ConKernelClient


def ConKernelClient(
    kwargs:t.Any
)->None:

A KernelClient with async APIs

get_[channel]_msg() methods wait for and return messages on channels, raising :exc:queue.Empty if no message arrives within timeout seconds.


source

ConKernelManager


def ConKernelManager(
    args:Any, kwargs:Any
)->None:

An async kernel manager.

km = ConKernelManager(session=Session(key=b'x'))
await km.start_kernel()
await km.is_alive()
True
kc = await km.client().start_channels()
await kc.is_alive()
True
mid = kc.execute('1+1', reply=False)
mid
'95164565-1b052c74632b03fbe217b8de_3713_1'
@patch
async def get_pubs(self:KernelClient, timeout=0.2):
    "Retrieve all outstanding iopub messages"
    res = []
    try:
        while msg := await self.get_iopub_msg(timeout=timeout): res.append(msg)
    except Empty: pass
    return res
pubs = await kc.get_pubs()
[(o['msg_type'],o['content']) for o in pubs]
[('status', {'execution_state': 'busy'}),
 ('execute_input', {'code': '1+1', 'execution_count': 1}),
 ('execute_result',
  {'data': {'text/plain': '2'}, 'metadata': {}, 'execution_count': 1}),
 ('status', {'execution_state': 'idle'})]
pubs[0]['parent_header']
{'msg_id': '95164565-1b052c74632b03fbe217b8de_3713_1',
 'msg_type': 'execute_request',
 'username': 'jhoward',
 'session': '95164565-1b052c74632b03fbe217b8de',
 'date': datetime.datetime(2026, 2, 27, 3, 16, 39, 228656, tzinfo=tzutc()),
 'version': '5.4'}
kc.stop_channels()
kc = await km.client().start_channels()
r = await kc.execute('2+1', timeout=1, reply=True)
r
{'header': {'msg_id': 'd40943ee-1c9991f3726c7b2c58e4e42c_3719_21',
  'msg_type': 'execute_reply',
  'username': 'jhoward',
  'session': 'd40943ee-1c9991f3726c7b2c58e4e42c',
  'date': datetime.datetime(2026, 2, 27, 3, 16, 40, 156085, tzinfo=tzutc()),
  'version': '5.4'},
 'msg_id': 'd40943ee-1c9991f3726c7b2c58e4e42c_3719_21',
 'msg_type': 'execute_reply',
 'parent_header': {'msg_id': '95164565-1b052c74632b03fbe217b8de_3713_1',
  'msg_type': 'execute_request',
  'username': 'jhoward',
  'session': '95164565-1b052c74632b03fbe217b8de',
  'date': datetime.datetime(2026, 2, 27, 3, 16, 40, 151952, tzinfo=tzutc()),
  'version': '5.4'},
 'metadata': {'started': '2026-02-27T03:16:40.153095Z',
  'dependencies_met': True,
  'engine': '45b26be5-508d-4fa0-8397-a8e45704f6da',
  'status': 'ok'},
 'content': {'status': 'ok',
  'execution_count': 2,
  'user_expressions': {},
  'payload': []},
 'buffers': []}
await kc.get_pubs()
kc.execute('print("orphan")')
await asyncio.sleep(0.3)
slow, fast = await asyncio.gather(
    kc.execute('import time; time.sleep(0.3)', timeout=5, reply=True),
    kc.execute('1+1', timeout=5, reply=True),
    return_exceptions=True)
test_eq(type(slow), dict)
test_eq(type(fast), dict)
a = kc.execute('x=2', reply=True)
b = kc.execute('y=3', reply=True)
r = await asyncio.wait_for(asyncio.gather(a,b), timeout=2)
test_eq(len(r), 2)
r[0]['parent_header']['msg_id']
'95164565-1b052c74632b03fbe217b8de_3713_5'
if await km.is_alive():
    kc.stop_channels()
    await km.shutdown_kernel()