safepyrun

Safe(ish) running of python code
print(await view_dlg('00_core.ipynb'))
<msgs><code id="_955b9784">#| default_exp core</code><note id="_0aafe008"># safepython</note><code id="_468aa264" export>from fastcore.utils import *
from fastcore.meta import delegates
from fastcore.xtras import asdict
from fastcore.xdg import xdg_config_home
from fastcore.docments import MarkdownRenderer
from pyskills import *
from pyskills import __pytools__

from inspect import currentframe,Parameter,signature
from contextvars import ContextVar
from types import MappingProxyType

import linecache,builtins,inspect,types,subprocess,ast,asyncio</code><code id="_af5b0967">from fastcore.test import expect_fail,test_eq
from fastcore.test import test_eq, test_fail
import string</code><note id="_b91c2756">## Helpers and setup</note><code id="_16c25d79" export># ContextVar fallback for when stack walking fails (e.g. inside asyncio.gather)
_rp_globals = ContextVar('_rp_globals', default=None)

def _find_frame_dict(sentinel:str):
    "Find the globals dict containing sentinel, or calling frame's globals if no sentinel"
    frame = currentframe().f_back.f_back
    if not sentinel: return frame.f_globals
    while frame:
        if sentinel in frame.f_globals: return frame.f_globals
        frame = frame.f_back
    # Fall back to RunPython globals stored in ContextVar (e.g. from asyncio.gather)
    rpg = _rp_globals.get()
    if rpg and sentinel in rpg: return rpg
    return globals()
</code><note id="_d48e0e99">`_find_frame_dict` walks the call stack looking for a frame whose globals contain `sentinel`. This lets `RunPython` find the caller's namespace without requiring an explicit globals dict. If no sentinel is found, it falls back to its own module globals.</note><code id="_fa15f5b0">_test_sentinel = True
d = _find_frame_dict('_test_sentinel')
assert '_test_sentinel' in d
d2 = _find_frame_dict('nonexistent_sentinel_xyz')
assert d2 is not None</code><code id="_99383e45" export>def find_var(var:str):
    "Search for var in all frames of the call stack"
    return _find_frame_dict(var)[var]</code><code id="_2a3ff1c2">find_var('_test_sentinel')</code><note id="_4af40cc9">## Builtins and wrappers</note><code id="_14a70f13" export>from fastaudit import *</code><code id="_de1fbbd5" export>mon_disable_policy = dict(callees=set(), callers=set(), pairs=set(), preds=(),
    callee_prefixes=(), callee_suffixes=('.dumps','.loads'),
    caller_prefixes=(), caller_suffixes=())

def freeze_mon_policy(p):
    "Freeze monitoring policy for one run"
    return dict(callees=frozenset(p.get('callees', ())), callers=frozenset(p.get('callers', ())),
        pairs=frozenset(p.get('pairs', ())), preds=tuple(p.get('preds', ())),
        callee_prefixes=tuple(p.get('callee_prefixes', ())), callee_suffixes=tuple(p.get('callee_suffixes', ())),
        caller_prefixes=tuple(p.get('caller_prefixes', ())), caller_suffixes=tuple(p.get('caller_suffixes', ()))
    )</code><note id="_fe4ad32c">`mon_disable_policy` is mutable module-level configuration for cheap `sys.monitoring` DISABLE decisions. `freeze_mon_policy` snapshots it for a single run so policy checks are stable while code executes.</note><code id="_2b3a8099" export>mon = sys.monitoring
def on_call(caller, callee, fn, code, off, data, calls):
    "Fast monitoring callback to decide if event should be DISABLEd"
    p = data['mon_policy']
    if callee in p['callees'] or caller in p['callers'] or (caller,callee) in p['pairs']: return mon.DISABLE
    if callee.startswith(p['callee_prefixes']) or callee.endswith(p['callee_suffixes']): return mon.DISABLE
    if caller.startswith(p['caller_prefixes']) or caller.endswith(p['caller_suffixes']): return mon.DISABLE
    if any(o(caller, callee, fn, code, off, calls) for o in p['preds']): return mon.DISABLE</code><note id="_f05a7b5d">`on_call` uses only caller/callee names, call pairs, prefixes/suffixes, and optional predicates to decide whether this call site can be disabled for performance. Since fastaudit raises audit events for sys.monitoring c-call events, they can also be checked there when args/kwargs policy checks are needed.</note><code id="_2af8b7f2" export>def frame_args(fr, obj=None):
    "Introspection helper used before audit denies an op; check whether it happened inside an approved call"
    c = fr.f_code
    n,nkw = c.co_argcount,c.co_kwonlyargcount
    args = [fr.f_locals[o] for o in c.co_varnames[:n] if o in fr.f_locals]
    if obj is not None and args and args[0] is obj: args = args[1:]
    kw = {o:fr.f_locals[o] for o in c.co_varnames[n:n+nkw] if o in fr.f_locals}
    i = n+nkw
    if c.co_flags & inspect.CO_VARARGS:
        args += list(fr.f_locals.get(c.co_varnames[i], ()))
        i += 1
    if c.co_flags & inspect.CO_VARKEYWORDS: kw.update(fr.f_locals.get(c.co_varnames[i], {}))
    return args,kw</code><note id="_5a2a2e04">Check on a plain function, including normal positional arguments, keyword-only arguments, and extra `**kwargs`:</note><code id="_1dd53f6c">def some_tool(path, text, *, overwrite=False, **kwargs): return frame_args(currentframe())
args,kwargs = some_tool('path', 'text', overwrite=True, a='b')
test_eq(args, ['path', 'text'])
test_eq(kwargs, {'overwrite': True, 'a': 'b'})</code><note id="_abaf626e">With methods removes `self` from the returned positional arguments:</note><code id="_5fd61cfd">class F:
    def some_meth(self, path, text, *, overwrite=False, **kwargs): return frame_args(currentframe(), self)
args,kwargs = F().some_meth('path', 'text', overwrite=True, a='b')
test_eq(args, ['path', 'text'])
test_eq(kwargs, {'overwrite': True, 'a': 'b'})</code><code id="_ae19467f" export>def _ctx_check(s, name, obj, a, kw, data):
    "Check a pytool entry set for name, including validator tuples"
    if ... in s or name in s: return True
    for x in s:
        if isinstance(x, tuple) and x[0] == name:
            a = list(a[1:] if obj is not None and a and a[0] is obj else a)
            x[1](obj, a, kw, data)
            return True
    return False

def _call_allowed(c, data):
    "Check one logical call against registered pytools"
    pytools = data['pytools']
    mod = sys.modules.get(c.module)
    qn,nm = c.qualname,getattr(c.fn, '__name__', None) or (c.qualname or '').rsplit('.', 1)[-1]
    if not nm: return False
    if mod in pytools and _ctx_check(pytools[mod], nm, None, c.args, c.kwargs, data): return True
    if mod and '.' in qn:
        cls = getattr(mod, qn.rsplit('.', 1)[0], None)
        obj = c.args[0] if cls and c.args and isinstance(c.args[0], cls) else None
        for o in [cls] + list(getattr(cls, '__mro__', ())):
            if o in pytools and _ctx_check(pytools[o], nm, obj, c.args, c.kwargs, data): return True
    return False

def _ctx_allowed(info):
    "Check all logical calls in deny info against registered pytools"
    return any(_call_allowed(c, info.data) for c in info.calls)</code><note id="_31c457fd">`_ctx_check` checks one pytool registration set for a matching name. It returns `True` for direct name matches or allow-all (`...`), and for validator tuples it calls the validator with `(obj, args, kwargs, data)` before returning. It exists so module, class, tracked-call, and frame-call checks all share the same validator logic.

`_call_allowed` checks one logical `CallInfo` against the pytool registry. It returns `True` when the call is allowed either at module level or class/MRO level, including constrained validator entries. It exists to centralize pytool lookup for both async tracked calls and stack-frame fallback calls.

`_ctx_allowed` checks every logical call in a `DenyInfo`. It returns `True` if any tracked or frame-derived call is registered as allowed. It exists so `before_deny` no longer duplicates stack walking or call matching logic.</note><code id="_53fe97bf" export>class RawDenyInfo:
    def __init__(self, args, frame, msg, calls, frame_args): store_attr()

class CallInfo:
    def __init__(self, fn=None, args=(), kwargs=None, module=None, qualname=None, name=None, frame=None, source=None):
        kwargs = kwargs or {}
        if name is None and module and qualname: name = f'{module}.{qualname}'
        store_attr()

class DenyInfo:
    def __init__(self, event, args, frame, msg, data, calls, frame_args):
        self.event,self.data = event,data
        self.raw = RawDenyInfo(args, frame, msg, calls, frame_args)
        self.tracked_calls = L(calls).map(self._tracked_call)
        self.frame_calls = L(self._frame_calls())
        self.calls = self.tracked_calls + self.frame_calls
        self.call = first(self.calls, None)
        self.args = self.call.args if self.call else ()
        self.kwargs = self.call.kwargs if self.call else {}

    def _tracked_call(self, c): return CallInfo(c.fn, c.args, c.kwargs, c.module, c.qualname, c.name, source='tracked')
    def find(self, name): return first(o for o in self.calls if o.name == name or o.qualname == name)

    def _frame_calls(self):
        fr = self.raw.frame
        while fr:
            mod = sys.modules.get(fr.f_globals.get('__name__'))
            qn = fr.f_code.co_qualname
            try: a,kw = self.raw.frame_args(fr)
            except Exception: a,kw = (),{}
            yield CallInfo(args=a, kwargs=kw, module=getattr(mod, '__name__', None), qualname=qn, frame=fr, source='frame')
            fr = fr.f_back</code><note id="_8cd04000">`DenyInfo` is the public object passed to `pre_deny`. It exposes `event`, `data`, `call`, `args`, `kwargs`, `calls`, `tracked_calls`, and `frame_calls`, with raw audit details under `raw`. It exists to make policy callbacks ergonomic while preserving access to lower-level audit information when needed.</note><code id="_d0468fcc" export>def before_deny(event, args, frame, msg, data, calls, pre_deny=None, _frame_args=frame_args):
    "Check whether a possibly-denied audit event happened inside an approved call."
    info = DenyInfo(event, args, frame, msg, data, calls, _frame_args)
    if pre_deny and (pre := pre_deny(info)) is not None: return pre
    if _ctx_allowed(info): return True</code><note id="_a7b3ba59">`before_deny` is the audit-policy adapter called by `fastaudit`. It builds a `DenyInfo`, gives `pre_deny(info)` first refusal, then falls back to registered pytool checks via `_ctx_allowed`. It returns `True` to allow the audited operation, or `None` to leave it denied by the audit layer.</note><note id="_17f4ab6e">Check direct allow-by-name:</note><code id="_152c1a94">tst_data = dict(pytools={sys.modules[__name__]: {'_bd_allowed'}}, ok_dests=set())
def _bd_allowed(): return before_deny(None, None, currentframe(), None, data=tst_data, calls=[])
def _bd_denied (): return before_deny(None, None, currentframe(), None, data=tst_data, calls=[])

assert _bd_allowed()
assert not _bd_denied()</code><note id="_989a7bc6">Check the registered method passes its object, recovered args, kwargs, and allowed destinations:</note><code id="_1673977b">_seen = []
def _bd_check(obj, args, kw, data): _seen.append((obj, args, kw, data['ok_dests']))

class _BDT:
    def save(self, path, *, overwrite=False): return before_deny(None, None, currentframe(), None, data=tst_data, calls=[])
tst_data = dict(pytools={_BDT: [('save', _bd_check)]}, ok_dests={'/tmp'})

t = _BDT()
test_eq(t.save('/tmp/x', overwrite=True), True)
test_eq(_seen, [(t, ['/tmp/x'], dict(overwrite=True), {'/tmp'})])</code><note id="_a94ced07">## Main implementation</note><code id="_12b831de" export>def srcfn(src):
    "Stores src in linecache under <pyrun_{i%10}>, returns the name." 
    linecache.cache[fn] = (len(src), None,  src.splitlines(keepends=True), fn:=f'<pyrun_{srcfn.i}>')
    srcfn.i = (srcfn.i + 1)%10
    return fn
srcfn.i=0</code><code id="_d159c2d1">srcfn(''),srcfn('')</code><code id="_7dad9339" export>_builtins = dict(builtins.__dict__)

async def __run_python(code:str, g=None, ok_dests=None):
    _rp_globals.set(g)
    rg = g | dict(__builtins__=_builtins, __name__='<pyrun>')
    loc = {}
    async def run(src, is_exec=True):
        comp = compile(src, srcfn(src), 'exec' if is_exec else 'eval', flags=ast.PyCF_ALLOW_TOP_LEVEL_AWAIT)
        r = eval(comp, rg, loc)
        return (await r) if inspect.isawaitable(r) else r
    tree = ast.parse(code)
    warnings.filterwarnings('ignore', category=SyntaxWarning)
    res = None
    if tree.body and isinstance(tree.body[-1], ast.Expr):
        last = tree.body.pop()
        if tree.body:
            await run(ast.unparse(ast.Module(tree.body, [])))
            rg.update(loc) # generators resolve inner vars from globals, so we need to make sure they are in `loc`
        res = await run(ast.unparse(ast.Expression(last.value)), False)
    else: await run(code)
    g.update(loc)
    return res</code><note id="_b3932b1f">`__run_python` executes user code with separate globals (`rg`) and locals (`loc`) mappings. Earlier top-level assignments are written into `loc`. Most final expressions can still see those names because they are evaluated with the same locals mapping.

Generator expressions are different: they run in their own nested scope. When the generator body later resolves a name assigned by an earlier statement, it looks in the globals mapping (`rg`), not the outer `eval` locals mapping (`loc`). So code like `x = 10; sum(i+x for i in [1,2])` failed with `NameError` because `x` was only in `loc`.

After executing the non-final statements, `rg.update(loc)` copies those assigned names into the globals mapping before evaluating the final expression. This preserves the notebook-style behavior where names defined earlier in the cell are visible inside generators, comprehensions, lambdas, and other nested scopes used by the final expression.</note><code id="_0cee0fc7">await __run_python(
    "rnd_var_name_123 = 10\n"
    "sum(x + rnd_var_name_123 for x in [1, 2])",
    g={}
)</code><note id="_1b71622e">`_run_python` builds the per-run policy bundle passed to `fastaudit`, including approved pytools, allowed destinations, and the frozen monitoring policy snapshot.</note><code id="_0f9e4c14" export>allow_imports = set()</code><code id="_7ccc8d88" export>async def _wait_bg(before):
    "Wait for tasks created since `before`; surface their failures."
    cur = asyncio.current_task()
    excs = []
    while ts := [t for t in asyncio.all_tasks()-before if t is not cur and not t.done()]:
        excs += [r for r in await asyncio.gather(*ts, return_exceptions=True)
                 if isinstance(r, BaseException) and not isinstance(r, asyncio.CancelledError)]
    if excs: raise first(excs, risinstance(PermissionError)) or excs[0]</code><code id="_d79ccebc" export>async def _run_python(code:str, g=None, ok_dests=None, pre_deny=None, **kwargs):
    _rp_globals.set(g)
    data = dict(pytools=MappingProxyType({k:frozenset(v) for k,v in __pytools__.items()}),
        ok_dests=ok_dests or (), mon_policy=freeze_mon_policy(mon_disable_policy)) | kwargs
    denyf = partial(before_deny, pre_deny=pre_deny)
    before = asyncio.all_tasks()
    with mk_audit(ok_dests, before_deny=denyf, data=data, allow_imports=frozenset(allow_imports), on_call=on_call)():
        res = await __run_python(code=code, g=g, ok_dests=ok_dests)
    await _wait_bg(before)
    return res</code><note id="_ecf3c2a4">`_run_python` snapshots `asyncio.all_tasks()` before running and waits for any new tasks (including tasks they spawn in turn) before returning, so a tool call only completes when all its background work has. If a background task fails, the tool call raises that error. Tasks spawned by tool code with `asyncio.create_task` would otherwise outlive the audit context.</note><code id="_6cf7bdf6">res = []
async def bg_job():
    await asyncio.sleep(0.1)
    res.append('finished')

r = await _run_python("asyncio.create_task(bg_job())\n'returned'", g=dict(asyncio=asyncio, bg_job=bg_job), ok_dests=())
test_eq(r, 'returned')
test_eq(res, ['finished'])  # bg task completed before the call returned</code><code id="_5656cead">async def bg_bad():
    await asyncio.sleep(0.01)
    subprocess.run(['ls'])
    
with expect_fail(PermissionError, 'subprocess.Popen blocked'):
    await _run_python("asyncio.create_task(bg_bad())", g=dict(asyncio=asyncio, bg_bad=bg_bad), ok_dests=())</code><code id="_5d38a1d0" export>default_ok_dests = ()</code><code id="_91b321ca" export>def _check_user_code(tree, ban_imports, ban_defs):
    "Reject user code that imports banned modules, defines names, mentions importlib, or calls exec/eval/compile"
    for n in ast.walk(tree):
        if ban_defs and isinstance(n, (ast.FunctionDef, ast.AsyncFunctionDef, ast.ClassDef)):
            raise PermissionError(f"def/class not allowed: {n.name}")
        if isinstance(n, ast.Import):
            for a in n.names:
                if a.name in ban_imports: raise PermissionError(f"import {a.name!r} not allowed")
        elif isinstance(n, ast.ImportFrom):
            if n.module and n.module in ban_imports: raise PermissionError(f"from {n.module!r} import not allowed")
        elif isinstance(n, ast.Call) and isinstance(n.func, ast.Name) and n.func.id in {'exec','eval','compile'}:
            raise PermissionError(f"{n.func.id}() not allowed")
        elif isinstance(n, ast.Name) and n.id=='importlib': raise PermissionError("importlib not allowed")</code><note id="_8daedb2b">We have some extra rules based on the raw code being run (as opposed to the code inside the implementations being called):
- No imports
- No exec/eval
- No defining functions or classes.
`_check_user_code` is responsible for these rules.</note><code id="_5447b52c" export>def _find_perm_err(e):
    "Walk the exception chain looking for a PermissionError"
    pe = e
    while pe:
        if isinstance(pe, PermissionError): return pe
        pe = pe.__cause__ or pe.__context__
    return e</code><note id="_9713d9e3">Some libs (like `httpcore`) wrap our permission errors, so we can't handle them directly. We unwrap the error in that situation.</note><code id="_5f44108c" export>class RunPython:
    """Execute Python with audit-hook safety checks and access to LLM tools, returning last expression.
    `import` works in the usual way. All builtins are available.
    Multiline code blocks can be used, including defining functions and variables.
    **NB**: Locals are exported back to the caller's namespace."""

    def __init__(self, g=None, sentinel=None, ok_dests=UNSET, ban_imports=frozenset({'socket','importlib'}), ban_defs=True,
        pre_deny=None, **kwargs):
        if ok_dests is UNSET: ok_dests = default_ok_dests
        if g is None:
            try: ip = get_ipython()
            except NameError: ip = None
            g = getattr(ip, 'user_ns', None)
        if g is None: g = _find_frame_dict(sentinel)
        self.g = g
        self.ok_dests = ok_dests or ()
        self.ban_imports,self.ban_defs,self.pre_deny,self.kwargs = ban_imports,ban_defs,pre_deny,kwargs

    async def __call__(self, code:str):
        try:
            _check_user_code(ast.parse(code), ban_imports=self.ban_imports, ban_defs=self.ban_defs)
            return await _run_python(code, g=self.g, ok_dests=self.ok_dests, pre_deny=self.pre_deny, **self.kwargs)
        except Exception as e:
            e = _find_perm_err(e)
            tb = e.__traceback__
            while tb.tb_next and not tb.tb_frame.f_code.co_filename.startswith('<pyrun'): tb = tb.tb_next
            raise e.with_traceback(tb) from None</code><note id="_f1b8972a">`RunPython` is the public API. It captures the caller's globals via `_find_frame_dict`, optionally takes `ok_dests` for write-checking, and executes code under the audit-hook policy.</note><code id="_d0bd7086">pyrun = RunPython()</code><code id="_d0e59c71">await pyrun('[]')</code><code id="_725dc98c">async def f(): return 1</code><code id="_8c093ad4">await pyrun('await f()')</code><note id="_305dae7c">Passing an empty dict as the namespace results in a clean environment:</note><code id="_2c31cd80">g = {}
await RunPython(g=g)('_ns_test_var = 42')
test_eq(list(g.keys()), ['_ns_test_var'])
assert '_ns_test_var' not in globals()</code><code id="_9105f690" export>@delegates(RunPython)
def create_pyrun_magic(shell=None, pyrun=None, **kwargs):
    "Create magic"
    if not shell: shell = get_ipython()
    if not pyrun: pyrun = RunPython(**kwargs)
    def f(line, cell=None):
        if line=='-o': return pyrun
        if not cell: return
        return pyrun(cell)
    shell.register_magic_function(f, 'line_cell', 'py')</code><code id="_5da01116">create_pyrun_magic()</code><code id="_8f1290cc">%%py
print('tt')</code><code id="_8210f966">%%py
type('t')</code><code id="_2fdebd30">%%py
a = 1
a+=2
a</code><note id="_d8c2039b">Unpacking is allowed:</note><code id="_2718662d">%%py
a = [1,2,3]
print(*a)</code><code id="_9f0b2705">def f(): warnings.warn('a warning')</code><code id="_9c90274e">%%py
print("asdf")
f()
1+1</code><note id="_e4242050">Classes and functions can not be created:</note><code id="_885bb2d7">with expect_fail(PermissionError):
    await pyrun('class A:\n    def __init__(self): print("hi")')

with expect_fail(PermissionError):
    await pyrun('def f(): print("hi")')</code><code id="_7a7fdd13">with expect_fail(PermissionError): await pyrun('os.system("ls")')</code><code id="_4834cc35">%%py
print(os.unlink)
print(type(os.unlink))
print(os.unlink.__qualname__)</code><code id="_fad5d28c">class C: ...
c = C()</code><code id="_5e4565da">%%py
isinstance(C, type)</code><code id="_020634d9">%%py
isinstance(c, type)</code><note id="_8155a50d">### `allow_write_types`</note><code id="_de2cd76e">o = SimpleNamespace(x=1)

await pyrun('''
d = {}
d["x"] = 1
o.x = 2
d["x"],o.x''')</code><code id="_2324abe0" export>_path_wp = PathWritePolicy()
_dst1 = PosAllowPolicy(1, 'dst')
_rename_wp = PathWritePolicy(target_pos=0, target_kw='target')</code><note id="_afeb38c8">## Config</note><note id="_a33fbd3b">`safepyrun` loads an optional user config from `{xdg_config_home}/safepyrun/config.py` at import time, after all defaults are registered. This lets users permanently extend the sandbox allowlists without modifying the package. The config file is executed with all `safepyrun.core` globals already available — no imports needed. This includes `allow`, `allow_write_types`, `AllowPolicy`, `PathWritePolicy`, `PosAllowPolicy`, `OpenWritePolicy`, and all standard library modules already imported by the module.

Example `~/.config/safepyrun/config.py` (Linux) or `~/Library/Application Support/safepyrun/config.py` (macOS):

```python
# Add pandas tools
allow({pandas.DataFrame: ['head', 'describe', 'info', 'shape']})
```

If the config file has errors, a warning is emitted and the defaults remain intact.</note><code id="_e2c7e67d" export>_cfg_py = xdg_config_home() / 'safepyrun' / 'config.py'
if _cfg_py.exists():
    try:
        _cfg_ns = {k:v for k,v in globals().items() if not k.startswith('_')}
        exec(_cfg_py.read_text(), _cfg_ns)
        # TODO: Need to update this for the new set-data/allow API
        if 'default_ok_dests' in _cfg_ns: globals()['default_ok_dests'] = _cfg_ns['default_ok_dests']
    except Exception as e: warnings.warn(f"Failed to load {_cfg_py}: {e}")</code><note id="_5d8bbd88">## Examples</note><code id="_6debe6b5">%%py
a = {"b":1}
list(a.items())</code><code id="_ea89fdcd">%%py
Path().exists()</code><code id="_c59e4bbe">%%py
os.path.join('/foo', 'bar', 'baz.py')</code><code id="_84f1e246">%%py
a_=3</code><code id="_f863678c">#| eval: false
a_</code><code id="_51bc23b1">%%py
aa_='33'</code><code id="_8e9c3558">%%py
len(aa_)</code><code id="_a4990482">def g(): ...</code><code id="_131fa624">%%py
inspect.getsource(g)</code><code id="_4d8fc1d9">with expect_fail(PermissionError): await pyrun("os.unlink('/foo/bar')")</code><code id="_1be51d80">async def agen():
    for x in [1,2]: yield x</code><code id="_2d974790">%%py
res = []
async for x in agen(): res.append(x)
res</code><code id="_64247c3c">import asyncio
async def fetch(n): return n * 10</code><code id="_654e0dbb">%%py
print(string.ascii_letters)
await asyncio.gather(fetch(1), fetch(2), fetch(3))</code><code id="_e5ac6304">import numpy as np</code><code id="_051a7ba4">%%py
np.array([1,2,3]).sum()</code><note id="_e9fcdbf1">### Allow policy examples</note><code id="_d7cb9b15">pyrun2 = RunPython(ok_dests=['/tmp'])</code><code id="_3a0e91e9">await pyrun2("Path('/tmp/test_write.txt').write_text('hello')")</code><code id="_6dc36d93">await pyrun2("open('/tmp/test_open.txt', 'w').write('hi')")</code><code id="_f426e9de">with expect_fail(PermissionError): await pyrun2("Path('/etc/evil.txt').write_text('bad')")
with expect_fail(PermissionError): await pyrun2("open('/root/bad.txt', 'w')")</code><code id="_19a9ba34">await pyrun2("open('/etc/passwd', 'r').read(10)")</code><code id="_5989bc35">await pyrun2("import shutil; shutil.copy('/tmp/test_write.txt', '/tmp/test_copy.txt')")</code><code id="_09cddc82">with expect_fail(PermissionError): await pyrun2("import shutil; shutil.copy('/tmp/test_write.txt', '/root/bad.txt')")
with expect_fail(PermissionError): await pyrun("Path('/tmp/test.txt').write_text('nope')")</code><code id="_0ee640d4">pyrun_cwd = RunPython(ok_dests=['.'])

# Writing to cwd should work
await pyrun_cwd("Path('test_cwd_ok.txt').write_text('hello')")</code><code id="_e9b0edea">with expect_fail(PermissionError): await pyrun("Path('test_cwd_ok.txt').write_text('hello')")</code><code id="_20b5a7f2">Path('test_cwd_ok.txt').unlink(missing_ok=True)</code><code id="_f00287f1"># Writing to /tmp should be blocked (not in ok_dests)
with expect_fail(PermissionError): await pyrun_cwd("Path('/tmp/nope.txt').write_text('bad')")</code><code id="_3f076b4d"># Parent traversal should be blocked
with expect_fail(PermissionError): await pyrun_cwd("Path('../escape.txt').write_text('bad')")</code><code id="_43e30169"># Sneaky traversal via subdir/../../ should also be blocked
with expect_fail(PermissionError): await pyrun_cwd("Path('subdir/../../escape.txt').write_text('bad')")</code><note id="_74d66403">## `allow`</note><code id="_929b33a5">def trusted_echo(): return subprocess.run(['echo', 'hi'], capture_output=True, text=True)</code><code id="_3285f58a">with expect_fail(PermissionError): await pyrun("trusted_echo().stdout")
with expect_fail(PermissionError): await pyrun("import subprocess; subprocess.run(['echo', 'hi'])")</code><code id="_81b73411">allow(trusted_echo)
test_eq((await pyrun("trusted_echo().stdout")), "hi\n")
with expect_fail(PermissionError): await pyrun("import subprocess; subprocess.run(['echo', 'hi'])")</code><code id="_7e941dbf">class _MethT:
    def echo (self): return run('echo hi')
    def echo2(self): return run('echo hi')

t = _MethT()
with expect_fail(PermissionError): await pyrun("t.echo()")
with expect_fail(PermissionError): await pyrun("t.echo2()")
allow(_MethT.echo)
test_eq(await pyrun("t.echo()"), "hi")

allow({_MethT:['echo2']})
test_eq(await pyrun("t.echo2()"), "hi")</code><code id="_35b7b510">@patch
def echo3(self:_MethT): return run('echo hi')
with expect_fail(PermissionError): await pyrun("t.echo3()")
allow({_MethT:['echo3']})
test_eq(await pyrun("t.echo3()"), "hi")</code><code id="_bb23ba62">import httpx</code><code id="_37b7d46d">with expect_fail(PermissionError): await pyrun('httpx.get("http://example.org")')</code><code id="_d0771c6b">@allow
def getexample(): return httpx.get('http://example.org')</code><code id="_9aa4cb66">await pyrun('getexample()')</code><code id="_1acf06e4">def testevent():
    sys.audit('pyrun.testevent')
    return 'ok'</code><code id="_006232b9">with expect_fail(PermissionError): await pyrun('testevent()')</code><code id="_5d797ec9">@allow
def testevent2():
    return testevent()</code><code id="_076ec64a">%%py
testevent2()</code><code id="_5661ac4c">def chk_url(obj, args, kw, data):
    url = args[0] if args else kw.get('url','')
    if url not in data.get('ok_urls', ()): raise PermissionError(url)</code><code id="_c845aa94">pyrun_urls = RunPython(ok_urls={'http://example.org'})</code><code id="_4fc5d0b0">allow({httpx.get: chk_url})
with expect_fail(PermissionError): await pyrun_urls('httpx.get("http://example.com")')
await pyrun_urls('httpx.get("http://example.org")')</code><code id="_a49a4700">@allow
def _httpget(url): return httpx.get(url)</code><code id="_98e6824f">def httpget(url):
    sys.audit('myapp.httpget', url)
    return _httpget(url)</code><code id="_c1ace36b">def url_allow(info):
    if info.event=='myapp.httpget' and info.raw.args[0] in info.data.get('ok_urls', ()): return True</code><code id="_0e736193">pyrun_urls2 = RunPython(pre_deny=url_allow, ok_urls={'http://example.org'})
with expect_fail(PermissionError): await pyrun_urls2('httpget("http://example.com")')
await pyrun_urls2('httpget("http://example.org")')</code><note id="_741cabd2">## Plots</note><code id="_92e10e61" export>def allow_matplotlib():
    import matplotlib.pyplot as plt
    from matplotlib.figure import Figure
    from matplotlib.axes import Axes
    from matplotlib.axis import Axis
    from matplotlib.spines import Spine, SpinesProxy
    from matplotlib.backend_bases import FigureCanvasBase
    from matplotlib.font_manager import FontManager
    from matplotlib.transforms import Bbox

    allow(Bbox.update_from_path, FigureCanvasBase.__init__, FontManager.findfont)
    allow({
        plt: ['plot','figure','axis', 'subplots', 'show', 'tight_layout', 'subplot', 'bar', 'scatter', 'hist',
            'xlabel', 'ylabel', 'title', 'legend', 'grid', 'xlim', 'ylim', 'colorbar', 'imshow'],
        Figure: ['tight_layout', 'set_size_inches', 'add_subplot', 'suptitle'],
        Axes: ['plot', 'bar', 'barh', 'scatter', 'hist', 'set_xlabel', 'set_ylabel', 'set_title',
            'legend', 'grid', 'set_xlim', 'set_ylim', 'tick_params', 'set_xticks', 'set_yticks',
            'annotate', 'text', 'axhline', 'axvline', 'fill_between', 'twinx', 'imshow', 'pie',
            'boxplot', 'errorbar', 'stem', 'loglog', 'semilogx', 'semilogy', 'set_xscale', 'set_yscale'],
        Axis: ['set_major_formatter', 'set_minor_formatter', 'set_major_locator', 'set_minor_locator'],
        Spine: ['set_visible'], SpinesProxy: ['set_visible'],
    })

    _savefig_wp = PosAllowPolicy(0, 'fname')
    allow({Figure: [('savefig', _savefig_wp)], plt: [('savefig', _savefig_wp)]})</code><code id="_28202dd0">import matplotlib.pyplot as plt
from matplotlib.figure import Figure
from matplotlib.axes import Axes
from matplotlib.axis import Axis
from matplotlib.spines import Spine, SpinesProxy</code><code id="_7fb4f5d1">allow_matplotlib()</code><code id="_c94ec416">%%py
fig, ax = plt.subplots(figsize=(3,2), dpi=100)
ax.plot([1,2,1], label='Triangle')
ax.plot([1,1.5,2], label='Rising')
ax.tick_params(labelsize=10)
ax.set_xlabel('X', fontsize=10)
ax.set_ylabel('Y', fontsize=10)
ax.spines[['top','right']].set_visible(False)
ax.yaxis.set_major_formatter('{x:.0f}')
ax.legend(fontsize=12);</code><code id="_7ea5e69c">#| eval: false
pyrun_tmp = RunPython(ok_dests=['/tmp'])
with expect_fail(PermissionError): await pyrun_tmp("fig.savefig(os.path.expanduser('~/plot.png'))")
await pyrun_tmp("fig.savefig('/tmp/plot.png')")</code><code id="_8d1cb417" export>def load_ipython_extension(ip):
    ns = ip.user_ns
    ns['pyrun'] = pyrun = RunPython(g=ns)
    ns['allow'] = allow
    create_pyrun_magic(ip, pyrun)</code><code id="_30964442" export>from fastcore.script import call_parse, Param</code><code id="_74c49b7c" export>@call_parse
def cli(path: Param("Path to script, or '-' for stdin", str, opt=False, nargs='?', default='-')):
    "Run a python script file in the safepyrun sandbox"
    try:
        code = sys.stdin.read() if path == '-' else Path(path).read_text()
        if (r := asyncio.run(RunPython()(code))) is not None: print(r)
    except Exception as e: return not print(f"Error: {e}", file=sys.stderr)</code><note id="_9911c2a6">## export -</note><code id="_e8671c65">#| hide
from nbdev import nbdev_export
nbdev_export()</code></msgs>

safepyrun is an allowlist-based Python sandbox that lets LLMs execute code safely(ish) in your real environment. Instead of isolating code in a container (which cuts it off from the libraries, data, and tools it actually needs) safepyrun runs in-process with controlled access to a curated subset of Python’s stdlib, plus any functions you explicitly opt in.

It’s the Python counterpart to safecmd, which does much the same thing for bash.

Installation

Install from pypi

$ pip install safepyrun

Background

When an LLM needs to run code on your behalf, the standard advice is to sandbox it in a container. The problem is that the whole reason you want the LLM running code is so it can interact with your environment – your files, your libraries, your running processes, your data. A containerised sandbox either can’t access any of that, or it requires complex volume mounts and dependency mirroring that recreate your environment inside the container.

You could just exec the LLM’s code directly in your process, which would give full access to everything… but “everything” includes shutil.rmtree, os.remove, subprocess.run("rm -rf /"), etc!

safepyrun takes a middle path. It runs the LLM’s code in your real Python process, with access to your real objects, but interposes an allowlist that controls which callables are accessible. The curated default list covers a large and useful subset of the standard library (string manipulation, math, JSON parsing, path inspection, data structures, and so on) while excluding anything that writes to the filesystem, spawns processes, or modifies system state. You can extend the list for your own functions.

The mechanism behind safepyrun is RestrictedPython, a long-standing project that compiles Python source code into a modified AST (Abstract Syntax Tree) where every attribute access, item access, and iteration is routed through hook functions. This means that when the LLM’s code does obj.method(), it doesn’t go directly to method – it goes through a gatekeeper that checks whether that callable is on the allowlist. The same applies to getattr, getitem, and iter, so there’s no easy way to accidentally reach a dangerous function through indirect access. safepyrun supplies these hook functions, wiring them up to an allowlist of permitted callables.

Because a lot of modern Python code (and many LLM tool-calling frameworks) is async, safepyrun also depends on restrictedpython-async, which extends RestrictedPython to handle await, async for, and async with expressions.

A lot of the online discussion around RestrictedPython suggests it’s not really useful for sandboxing, and that’s true if you’re trying to block a determined adversary. But an LLM is not a determined adversary. It’s a well-meaning but occasionally clumsy collaborator. The threat model is completely different: you don’t need to prevent deliberate escape attempts, you need to make it very unlikely that a hallucinated cleanup step or a misunderstood request causes damage. This is the same “safe-ish” philosophy used in safecmd for bash.

Once you internalise this, the design space opens up. It’s actually fine for the LLM to read files, access the internet via httpx, parse data, and call into your libraries. The things you want to prevent are writes to the filesystem, spawning processes, and overwriting important state. RestrictedPython gives us the mechanism to enforce this: it rewrites the AST to intercept attribute access, iteration, and item access, so that every callable goes through an allowlist check.

The allowlist has two tiers. First, a curated subset of the standard library that has been audited once so every user doesn’t have to repeat the work: things like re, json, itertools, math, collections, pathlib (read-only methods), and many more. Second, user-extended functions registered via allow(), so you can opt in your own project’s functions and methods. Symbols the LLM creates are exported back to the caller’s namespace by default, unless they would shadow an existing callable or module. Names ending with _ (like result_) are always exported, even if they shadow. Exported callables must still be registered with allow() to be callable in subsequent sandbox calls.

Usage

from safepyrun import *
from pyskills import *

The main entry point is pyrun = RunPython(), which returns an async function that takes a string of Python code and executes it in the sandbox. The last expression in the code is returned as the result, and any print() output is captured separately. Errors are caught and reported rather than crashing the caller.

pyrun = RunPython()
await pyrun('1+1')
2

You can mix print() output with a return value. The printed output goes to the stdout key, and the last expression becomes result:

await pyrun('print("hello"); 1+1')
hello
2

Modules can be imported. stderr is also captured:

await pyrun('''
import warnings
warnings.warn('a warning')
"ok"
''')
<pyrun_3>:2: UserWarning: a warning
  warnings.warn('a warning')
'ok'

A large subset of the standard library is available out of the box – things like re, json, math, itertools, collections, pathlib (read-only methods), and many more. These have been audited once so that every user doesn’t have to repeat the work:

await pyrun('import re; re.findall(r"\\d+", "there are 3 cats and 10 dogs")')
['3', '10']

The default allowlist covers text and data processing (re, json, csv, html, textwrap, string, difflib, unicodedata), math and numerics (math, cmath, statistics, decimal, fractions, random, operator), data structures (collections, heapq, bisect, plus methods on all the built-in types), iteration and functional tools (itertools, functools), read-only filesystem access (pathlib, os.path, fnmatch), date and time (datetime, time), URL handling and read-only HTTP (urllib.parse, httpx.get, ipaddress), encoding and serialization (base64, binascii, hashlib, zlib, pickle, struct), introspection (inspect, ast, keyword, sys.getsizeof), XML parsing (xml.etree.ElementTree), and various utilities (contextlib, copy, dataclasses, enum, secrets, uuid, pprint, shlex, colorsys, traceback).

The allow() function

Functions you define yourself or import from third-party packages are not automatically available. If the sandbox encounters an unregistered callable, it raises an error.

To make a function available, register it with allow():

def greet(name): return f"Hello, {name}!"
allow(greet) # Or use @allow decorator
await pyrun('greet("World")')
'Hello, World!'

The same applies to anything you import from PyPI. For instance, if you wanted the LLM to be able to call numpy.array, you would register it with allow('numpy.array').

allow() accepts two forms: strings and dicts. The simplest form is a bare string, which registers a single name. This works for standalone functions in the caller’s namespace:

@allow
def double(x): return x * 2
await pyrun('double(21)')
42

For methods on modules or classes, use dotted string syntax. The string should match how the sandbox will look up the callable, which is ClassName.method or module.function:

import numpy as np
allow(np.array, np.ndarray.sum)
await pyrun('np.array([1,2,3]).sum()')
np.int64(6)

Note that the string must use the actual class or module name as it appears in Python, not the alias. In the example above, even though the sandbox code uses np, the allowlist entry is 'numpy.array' because numpy is the module’s real name.

The dict form is a convenient shorthand for registering multiple methods on the same module or class at once. The key is the actual module or class object, and the value is a list of method name strings:

allow({np.ndarray: ['mean', 'reshape', 'tolist']})
await pyrun('np.array([1,2,3,4]).reshape(2,2).mean()')
np.float64(2.5)

The dict form does two things: it registers the class/module name itself (so it can be called as a constructor or accessed as a namespace), and it registers each ClassName.method pair. You can mix strings and dicts in a single allow() call:

allow('my_func', {np.linalg: ['norm', 'det']})

The _ suffix export convention

All symbols created in the sandbox are exported back to the caller’s namespace by default — unless the name already exists and the new value is callable or a module (to prevent accidental shadowing). Names ending with _ (but not starting with _) are always exported regardless, even if they shadow. Note that exported callables are not automatically available to call in subsequent sandbox runs — they must still be registered with allow() to be callable. Non-callable exports (variables, data structures) are available immediately:

await pyrun('result_ = [x**2 for x in range(5)]')
result_
[0, 1, 4, 9, 16]

The exported symbols are real objects in your namespace:

await pyrun('counts_ = {"a": 1, "b": 2}')
counts_
{'a': 1, 'b': 2}

This is particularly useful in LLM tool loops where the model might need to accumulate results across steps. The _ suffix is only needed when you want to force-export a name that would otherwise be blocked (because it shadows an existing callable or module).

Async support

The sandbox is async-native. If the code being executed contains await, async for, or async with expressions, they work as expected. Many modern Python libraries and LLM tool-calling frameworks are async, and you want the sandbox to be able to call into them without workarounds.

import asyncio
async def fetch(n): return n * 10
await pyrun('''
await asyncio.gather(fetch(1), fetch(2), fetch(3))
''')
[10, 20, 30]

Writable path permissions

By default, RunPython() allows writes to the current working directory (.) and /tmp, and blocks writes elsewhere. You can pass ok_dests to restrict writes to a different set of directory prefixes:

pyrun2 = RunPython(ok_dests=['/tmp'])
from pathlib import Path
await pyrun2("Path('/tmp/test_write.txt').write_text('hello')")
5
try: await pyrun2("Path('/etc/evil.txt').write_text('bad')")
except PermissionError as e: print(f'Blocked: {e}')
Blocked: open '/etc/evil.txt' not in ('/private/tmp',)

The same permission checking applies to open() in write mode, not just Path methods:

await pyrun2("open('/tmp/test_open.txt', 'w').write('hi')")
2
try: await pyrun2("open('/root/bad.txt', 'w')")
except PermissionError as e: print(f'Blocked: {e}')
Blocked: open '/root/bad.txt' not in ('/private/tmp',)

Read access is unaffected — only writes are gated:

await pyrun2("open('/etc/passwd', 'r').read(10)")
'##\n# User '

Higher-level file operations like shutil.copy are also intercepted. The destination is checked against ok_dests:

await pyrun2("import shutil; shutil.copy('/tmp/test_write.txt', '/tmp/test_copy.txt')")
'/tmp/test_copy.txt'
try: await pyrun2("import shutil; shutil.copy('/tmp/test_write.txt', '/root/bad.txt')")
except PermissionError as e: print(f'Blocked: {e}')
Blocked: shutil.copyfile '/root/bad.txt' not in ('/private/tmp',)

By default, RunPython() uses default_ok_dests, which allows writes in . and /tmp but blocks writes elsewhere.

await pyrun("Path('test_default_ok.txt').write_text('ok')")
await pyrun("Path('/tmp/test_default_tmp.txt').write_text('tmp')")

try: await pyrun("Path('/etc/nope.txt').write_text('bad')")
except PermissionError as e: print(f'Default blocked: {e}')

Path('test_default_ok.txt').unlink()
Default blocked: open '/etc/nope.txt' not in ('.', '/private/tmp', '/Users/jhoward/aai-ws')

If you want to disable write protection entirely, pass ok_dests=None:

pyrun_un = RunPython(ok_dests=None)
un_path = Path.home()/'safepyrun-un.txt'
await pyrun_un(f"Path({str(un_path)!r}).write_text('ok')")

You can use '.' to allow writes relative to the current working directory. Path traversal attempts (../, subdir/../../) are detected and blocked, so the sandbox can’t escape the permitted directory:

pyrun_cwd = RunPython(ok_dests=['.'])

# Writing to cwd should work
await pyrun_cwd("Path('test_cwd_ok.txt').write_text('hello')")
5
Path('test_cwd_ok.txt').unlink(missing_ok=True)

Writing to /tmp is blocked here since it’s not in ok_dests:

try: await pyrun_cwd("Path('/tmp/nope.txt').write_text('bad')")
except PermissionError: print("Blocked /tmp as expected")
Blocked /tmp as expected

Parent traversal is blocked if it resolves to a location outside ok_dests:

try: await pyrun_cwd("Path('../escape.txt').write_text('bad')")
except PermissionError: print("Blocked ../ as expected")
Blocked ../ as expected

Write policies

When ok_dests is set, safepyrun uses write policies to determine how to validate each callable’s destination arguments. Three built-in policy classes cover common patterns: checking a positional or keyword argument (PosAllowPolicy), checking the Path object itself (PathWritePolicy), and checking open() calls only when the mode is writable (OpenWritePolicy). You can also subclass AllowPolicy to create custom checks.

The simplest, PosAllowPolicy, checks a specific positional or keyword argument against the allowed destinations. Here, position 1 (or keyword dst) is validated — writing to /tmp is allowed, but /root is blocked:

pp = PosAllowPolicy(1, 'dst')
pp(None, ['src', '/tmp/ok'], {}, dict(ok_dests=['/tmp']))
try: pp(None, ['src', '/root/bad'], {}, dict(ok_dests=['/tmp']))
except PermissionError: print("PosAllowPolicy correctly blocked /root/bad")
PosAllowPolicy correctly blocked /root/bad

You can create custom write policies by subclassing AllowPolicy and implementing __call__. For example, here we show a policy that only allows writes to files with specific extensions — useful if you want the LLM to create .csv or .json files but not arbitrary scripts.

The __call__ signature receives (obj, args, kwargs, ok_dests) where obj is the object the method is called on (e.g. a Path instance), args/kwargs are the method’s arguments, and ok_dests is the list of permitted directory prefixes. Calling chk_dest first handles the directory check, then the custom logic adds the extension constraint on top.

class ExtWritePolicy(AllowPolicy):
    "Only allow writes to paths with specified extensions"
    def __init__(self, exts): self.exts = set(exts)
    def __call__(self, obj, args, kwargs, ok_dests):
        chk_dest(obj, ok_dests)
        if Path(str(obj)).suffix not in self.exts: raise PermissionError(f"{Path(str(obj)).suffix!r} not allowed")
ep = ExtWritePolicy(['.csv', '.json'])
ep(Path('/tmp/data.csv'), [], {}, ['/tmp'])
try: ep(Path('/tmp/script.sh'), [], {}, ['/tmp'])
except PermissionError: print("ExtWritePolicy correctly blocked .sh")
ExtWritePolicy correctly blocked .sh

You can register it with allow just like the built-in policies:

allow({Path: [('write_text', ExtWritePolicy(['.csv', '.json', '.txt']))]})

Configuration

safepyrun loads an optional user config from {xdg_config_home}/safepyrun/config.py at import time, after all defaults are registered. This lets you permanently extend the sandbox allowlists without modifying the package. The config file is executed with all safepyrun.core globals already available, so no imports are needed. This includes allow, allow_write_types, AllowPolicy, PathWritePolicy, PosAllowPolicy, OpenWritePolicy, and all standard library modules already imported by the module.

Example ~/.config/safepyrun/config.py (Linux) or ~/Library/Application Support/safepyrun/config.py (macOS):

import pandas

# Add pandas tools
allow({pandas.DataFrame: ['head', 'describe', 'info', 'shape']})

# Allow pandas to write CSV to ~/data
allow({pandas.DataFrame: [('to_csv', PosAllowPolicy(0, 'path_or_buf'))]})

If the config file has errors, a warning is emitted and the defaults remain intact.

CLI

safepyrun ships with a command-line tool that runs a Python script file in the sandbox. You can pass a file path, or pipe code in via stdin:

# Run a script file
$ safepyrun myscript.py

# Pipe code via stdin
$ echo "1+1" | safepyrun

The result of the last expression is printed to stdout, matching the behaviour of pyrun in Python. Errors are reported to stderr.