<msgs><code id="_955b9784">#| default_exp core</code><note id="_0aafe008"># safepython</note><code id="_468aa264" export>from fastcore.utils import *
from fastcore.meta import delegates
from fastcore.xtras import asdict
from fastcore.xdg import xdg_config_home
from fastcore.docments import MarkdownRenderer
from pyskills import *
from pyskills import __pytools__
from inspect import currentframe,Parameter,signature
from contextvars import ContextVar
from types import MappingProxyType
import linecache,builtins,inspect,types,subprocess,ast,asyncio</code><code id="_af5b0967">from fastcore.test import expect_fail,test_eq
from fastcore.test import test_eq, test_fail
import string</code><note id="_b91c2756">## Helpers and setup</note><code id="_16c25d79" export># ContextVar fallback for when stack walking fails (e.g. inside asyncio.gather)
_rp_globals = ContextVar('_rp_globals', default=None)
def _find_frame_dict(sentinel:str):
"Find the globals dict containing sentinel, or calling frame's globals if no sentinel"
frame = currentframe().f_back.f_back
if not sentinel: return frame.f_globals
while frame:
if sentinel in frame.f_globals: return frame.f_globals
frame = frame.f_back
# Fall back to RunPython globals stored in ContextVar (e.g. from asyncio.gather)
rpg = _rp_globals.get()
if rpg and sentinel in rpg: return rpg
return globals()
</code><note id="_d48e0e99">`_find_frame_dict` walks the call stack looking for a frame whose globals contain `sentinel`. This lets `RunPython` find the caller's namespace without requiring an explicit globals dict. If no sentinel is found, it falls back to its own module globals.</note><code id="_fa15f5b0">_test_sentinel = True
d = _find_frame_dict('_test_sentinel')
assert '_test_sentinel' in d
d2 = _find_frame_dict('nonexistent_sentinel_xyz')
assert d2 is not None</code><code id="_99383e45" export>def find_var(var:str):
"Search for var in all frames of the call stack"
return _find_frame_dict(var)[var]</code><code id="_2a3ff1c2">find_var('_test_sentinel')</code><note id="_4af40cc9">## Builtins and wrappers</note><code id="_14a70f13" export>from fastaudit import *</code><code id="_de1fbbd5" export>mon_disable_policy = dict(callees=set(), callers=set(), pairs=set(), preds=(),
callee_prefixes=(), callee_suffixes=('.dumps','.loads'),
caller_prefixes=(), caller_suffixes=())
def freeze_mon_policy(p):
"Freeze monitoring policy for one run"
return dict(callees=frozenset(p.get('callees', ())), callers=frozenset(p.get('callers', ())),
pairs=frozenset(p.get('pairs', ())), preds=tuple(p.get('preds', ())),
callee_prefixes=tuple(p.get('callee_prefixes', ())), callee_suffixes=tuple(p.get('callee_suffixes', ())),
caller_prefixes=tuple(p.get('caller_prefixes', ())), caller_suffixes=tuple(p.get('caller_suffixes', ()))
)</code><note id="_fe4ad32c">`mon_disable_policy` is mutable module-level configuration for cheap `sys.monitoring` DISABLE decisions. `freeze_mon_policy` snapshots it for a single run so policy checks are stable while code executes.</note><code id="_2b3a8099" export>mon = sys.monitoring
def on_call(caller, callee, fn, code, off, data, calls):
"Fast monitoring callback to decide if event should be DISABLEd"
p = data['mon_policy']
if callee in p['callees'] or caller in p['callers'] or (caller,callee) in p['pairs']: return mon.DISABLE
if callee.startswith(p['callee_prefixes']) or callee.endswith(p['callee_suffixes']): return mon.DISABLE
if caller.startswith(p['caller_prefixes']) or caller.endswith(p['caller_suffixes']): return mon.DISABLE
if any(o(caller, callee, fn, code, off, calls) for o in p['preds']): return mon.DISABLE</code><note id="_f05a7b5d">`on_call` uses only caller/callee names, call pairs, prefixes/suffixes, and optional predicates to decide whether this call site can be disabled for performance. Since fastaudit raises audit events for sys.monitoring c-call events, they can also be checked there when args/kwargs policy checks are needed.</note><code id="_2af8b7f2" export>def frame_args(fr, obj=None):
"Introspection helper used before audit denies an op; check whether it happened inside an approved call"
c = fr.f_code
n,nkw = c.co_argcount,c.co_kwonlyargcount
args = [fr.f_locals[o] for o in c.co_varnames[:n] if o in fr.f_locals]
if obj is not None and args and args[0] is obj: args = args[1:]
kw = {o:fr.f_locals[o] for o in c.co_varnames[n:n+nkw] if o in fr.f_locals}
i = n+nkw
if c.co_flags & inspect.CO_VARARGS:
args += list(fr.f_locals.get(c.co_varnames[i], ()))
i += 1
if c.co_flags & inspect.CO_VARKEYWORDS: kw.update(fr.f_locals.get(c.co_varnames[i], {}))
return args,kw</code><note id="_5a2a2e04">Check on a plain function, including normal positional arguments, keyword-only arguments, and extra `**kwargs`:</note><code id="_1dd53f6c">def some_tool(path, text, *, overwrite=False, **kwargs): return frame_args(currentframe())
args,kwargs = some_tool('path', 'text', overwrite=True, a='b')
test_eq(args, ['path', 'text'])
test_eq(kwargs, {'overwrite': True, 'a': 'b'})</code><note id="_abaf626e">With methods removes `self` from the returned positional arguments:</note><code id="_5fd61cfd">class F:
def some_meth(self, path, text, *, overwrite=False, **kwargs): return frame_args(currentframe(), self)
args,kwargs = F().some_meth('path', 'text', overwrite=True, a='b')
test_eq(args, ['path', 'text'])
test_eq(kwargs, {'overwrite': True, 'a': 'b'})</code><code id="_ae19467f" export>def _ctx_check(s, name, obj, a, kw, data):
"Check a pytool entry set for name, including validator tuples"
if ... in s or name in s: return True
for x in s:
if isinstance(x, tuple) and x[0] == name:
a = list(a[1:] if obj is not None and a and a[0] is obj else a)
x[1](obj, a, kw, data)
return True
return False
def _call_allowed(c, data):
"Check one logical call against registered pytools"
pytools = data['pytools']
mod = sys.modules.get(c.module)
qn,nm = c.qualname,getattr(c.fn, '__name__', None) or (c.qualname or '').rsplit('.', 1)[-1]
if not nm: return False
if mod in pytools and _ctx_check(pytools[mod], nm, None, c.args, c.kwargs, data): return True
if mod and '.' in qn:
cls = getattr(mod, qn.rsplit('.', 1)[0], None)
obj = c.args[0] if cls and c.args and isinstance(c.args[0], cls) else None
for o in [cls] + list(getattr(cls, '__mro__', ())):
if o in pytools and _ctx_check(pytools[o], nm, obj, c.args, c.kwargs, data): return True
return False
def _ctx_allowed(info):
"Check all logical calls in deny info against registered pytools"
return any(_call_allowed(c, info.data) for c in info.calls)</code><note id="_31c457fd">`_ctx_check` checks one pytool registration set for a matching name. It returns `True` for direct name matches or allow-all (`...`), and for validator tuples it calls the validator with `(obj, args, kwargs, data)` before returning. It exists so module, class, tracked-call, and frame-call checks all share the same validator logic.
`_call_allowed` checks one logical `CallInfo` against the pytool registry. It returns `True` when the call is allowed either at module level or class/MRO level, including constrained validator entries. It exists to centralize pytool lookup for both async tracked calls and stack-frame fallback calls.
`_ctx_allowed` checks every logical call in a `DenyInfo`. It returns `True` if any tracked or frame-derived call is registered as allowed. It exists so `before_deny` no longer duplicates stack walking or call matching logic.</note><code id="_53fe97bf" export>class RawDenyInfo:
def __init__(self, args, frame, msg, calls, frame_args): store_attr()
class CallInfo:
def __init__(self, fn=None, args=(), kwargs=None, module=None, qualname=None, name=None, frame=None, source=None):
kwargs = kwargs or {}
if name is None and module and qualname: name = f'{module}.{qualname}'
store_attr()
class DenyInfo:
def __init__(self, event, args, frame, msg, data, calls, frame_args):
self.event,self.data = event,data
self.raw = RawDenyInfo(args, frame, msg, calls, frame_args)
self.tracked_calls = L(calls).map(self._tracked_call)
self.frame_calls = L(self._frame_calls())
self.calls = self.tracked_calls + self.frame_calls
self.call = first(self.calls, None)
self.args = self.call.args if self.call else ()
self.kwargs = self.call.kwargs if self.call else {}
def _tracked_call(self, c): return CallInfo(c.fn, c.args, c.kwargs, c.module, c.qualname, c.name, source='tracked')
def find(self, name): return first(o for o in self.calls if o.name == name or o.qualname == name)
def _frame_calls(self):
fr = self.raw.frame
while fr:
mod = sys.modules.get(fr.f_globals.get('__name__'))
qn = fr.f_code.co_qualname
try: a,kw = self.raw.frame_args(fr)
except Exception: a,kw = (),{}
yield CallInfo(args=a, kwargs=kw, module=getattr(mod, '__name__', None), qualname=qn, frame=fr, source='frame')
fr = fr.f_back</code><note id="_8cd04000">`DenyInfo` is the public object passed to `pre_deny`. It exposes `event`, `data`, `call`, `args`, `kwargs`, `calls`, `tracked_calls`, and `frame_calls`, with raw audit details under `raw`. It exists to make policy callbacks ergonomic while preserving access to lower-level audit information when needed.</note><code id="_d0468fcc" export>def before_deny(event, args, frame, msg, data, calls, pre_deny=None, _frame_args=frame_args):
"Check whether a possibly-denied audit event happened inside an approved call."
info = DenyInfo(event, args, frame, msg, data, calls, _frame_args)
if pre_deny and (pre := pre_deny(info)) is not None: return pre
if _ctx_allowed(info): return True</code><note id="_a7b3ba59">`before_deny` is the audit-policy adapter called by `fastaudit`. It builds a `DenyInfo`, gives `pre_deny(info)` first refusal, then falls back to registered pytool checks via `_ctx_allowed`. It returns `True` to allow the audited operation, or `None` to leave it denied by the audit layer.</note><note id="_17f4ab6e">Check direct allow-by-name:</note><code id="_152c1a94">tst_data = dict(pytools={sys.modules[__name__]: {'_bd_allowed'}}, ok_dests=set())
def _bd_allowed(): return before_deny(None, None, currentframe(), None, data=tst_data, calls=[])
def _bd_denied (): return before_deny(None, None, currentframe(), None, data=tst_data, calls=[])
assert _bd_allowed()
assert not _bd_denied()</code><note id="_989a7bc6">Check the registered method passes its object, recovered args, kwargs, and allowed destinations:</note><code id="_1673977b">_seen = []
def _bd_check(obj, args, kw, data): _seen.append((obj, args, kw, data['ok_dests']))
class _BDT:
def save(self, path, *, overwrite=False): return before_deny(None, None, currentframe(), None, data=tst_data, calls=[])
tst_data = dict(pytools={_BDT: [('save', _bd_check)]}, ok_dests={'/tmp'})
t = _BDT()
test_eq(t.save('/tmp/x', overwrite=True), True)
test_eq(_seen, [(t, ['/tmp/x'], dict(overwrite=True), {'/tmp'})])</code><note id="_a94ced07">## Main implementation</note><code id="_12b831de" export>def srcfn(src):
"Stores src in linecache under <pyrun_{i%10}>, returns the name."
linecache.cache[fn] = (len(src), None, src.splitlines(keepends=True), fn:=f'<pyrun_{srcfn.i}>')
srcfn.i = (srcfn.i + 1)%10
return fn
srcfn.i=0</code><code id="_d159c2d1">srcfn(''),srcfn('')</code><code id="_7dad9339" export>_builtins = dict(builtins.__dict__)
async def __run_python(code:str, g=None, ok_dests=None):
_rp_globals.set(g)
rg = g | dict(__builtins__=_builtins, __name__='<pyrun>')
loc = {}
async def run(src, is_exec=True):
comp = compile(src, srcfn(src), 'exec' if is_exec else 'eval', flags=ast.PyCF_ALLOW_TOP_LEVEL_AWAIT)
r = eval(comp, rg, loc)
return (await r) if inspect.isawaitable(r) else r
tree = ast.parse(code)
warnings.filterwarnings('ignore', category=SyntaxWarning)
res = None
if tree.body and isinstance(tree.body[-1], ast.Expr):
last = tree.body.pop()
if tree.body:
await run(ast.unparse(ast.Module(tree.body, [])))
rg.update(loc) # generators resolve inner vars from globals, so we need to make sure they are in `loc`
res = await run(ast.unparse(ast.Expression(last.value)), False)
else: await run(code)
g.update(loc)
return res</code><note id="_b3932b1f">`__run_python` executes user code with separate globals (`rg`) and locals (`loc`) mappings. Earlier top-level assignments are written into `loc`. Most final expressions can still see those names because they are evaluated with the same locals mapping.
Generator expressions are different: they run in their own nested scope. When the generator body later resolves a name assigned by an earlier statement, it looks in the globals mapping (`rg`), not the outer `eval` locals mapping (`loc`). So code like `x = 10; sum(i+x for i in [1,2])` failed with `NameError` because `x` was only in `loc`.
After executing the non-final statements, `rg.update(loc)` copies those assigned names into the globals mapping before evaluating the final expression. This preserves the notebook-style behavior where names defined earlier in the cell are visible inside generators, comprehensions, lambdas, and other nested scopes used by the final expression.</note><code id="_0cee0fc7">await __run_python(
"rnd_var_name_123 = 10\n"
"sum(x + rnd_var_name_123 for x in [1, 2])",
g={}
)</code><note id="_1b71622e">`_run_python` builds the per-run policy bundle passed to `fastaudit`, including approved pytools, allowed destinations, and the frozen monitoring policy snapshot.</note><code id="_0f9e4c14" export>allow_imports = set()</code><code id="_7ccc8d88" export>async def _wait_bg(before):
"Wait for tasks created since `before`; surface their failures."
cur = asyncio.current_task()
excs = []
while ts := [t for t in asyncio.all_tasks()-before if t is not cur and not t.done()]:
excs += [r for r in await asyncio.gather(*ts, return_exceptions=True)
if isinstance(r, BaseException) and not isinstance(r, asyncio.CancelledError)]
if excs: raise first(excs, risinstance(PermissionError)) or excs[0]</code><code id="_d79ccebc" export>async def _run_python(code:str, g=None, ok_dests=None, pre_deny=None, **kwargs):
_rp_globals.set(g)
data = dict(pytools=MappingProxyType({k:frozenset(v) for k,v in __pytools__.items()}),
ok_dests=ok_dests or (), mon_policy=freeze_mon_policy(mon_disable_policy)) | kwargs
denyf = partial(before_deny, pre_deny=pre_deny)
before = asyncio.all_tasks()
with mk_audit(ok_dests, before_deny=denyf, data=data, allow_imports=frozenset(allow_imports), on_call=on_call)():
res = await __run_python(code=code, g=g, ok_dests=ok_dests)
await _wait_bg(before)
return res</code><note id="_ecf3c2a4">`_run_python` snapshots `asyncio.all_tasks()` before running and waits for any new tasks (including tasks they spawn in turn) before returning, so a tool call only completes when all its background work has. If a background task fails, the tool call raises that error. Tasks spawned by tool code with `asyncio.create_task` would otherwise outlive the audit context.</note><code id="_6cf7bdf6">res = []
async def bg_job():
await asyncio.sleep(0.1)
res.append('finished')
r = await _run_python("asyncio.create_task(bg_job())\n'returned'", g=dict(asyncio=asyncio, bg_job=bg_job), ok_dests=())
test_eq(r, 'returned')
test_eq(res, ['finished']) # bg task completed before the call returned</code><code id="_5656cead">async def bg_bad():
await asyncio.sleep(0.01)
subprocess.run(['ls'])
with expect_fail(PermissionError, 'subprocess.Popen blocked'):
await _run_python("asyncio.create_task(bg_bad())", g=dict(asyncio=asyncio, bg_bad=bg_bad), ok_dests=())</code><code id="_5d38a1d0" export>default_ok_dests = ()</code><code id="_91b321ca" export>def _check_user_code(tree, ban_imports, ban_defs):
"Reject user code that imports banned modules, defines names, mentions importlib, or calls exec/eval/compile"
for n in ast.walk(tree):
if ban_defs and isinstance(n, (ast.FunctionDef, ast.AsyncFunctionDef, ast.ClassDef)):
raise PermissionError(f"def/class not allowed: {n.name}")
if isinstance(n, ast.Import):
for a in n.names:
if a.name in ban_imports: raise PermissionError(f"import {a.name!r} not allowed")
elif isinstance(n, ast.ImportFrom):
if n.module and n.module in ban_imports: raise PermissionError(f"from {n.module!r} import not allowed")
elif isinstance(n, ast.Call) and isinstance(n.func, ast.Name) and n.func.id in {'exec','eval','compile'}:
raise PermissionError(f"{n.func.id}() not allowed")
elif isinstance(n, ast.Name) and n.id=='importlib': raise PermissionError("importlib not allowed")</code><note id="_8daedb2b">We have some extra rules based on the raw code being run (as opposed to the code inside the implementations being called):
- No imports
- No exec/eval
- No defining functions or classes.
`_check_user_code` is responsible for these rules.</note><code id="_5447b52c" export>def _find_perm_err(e):
"Walk the exception chain looking for a PermissionError"
pe = e
while pe:
if isinstance(pe, PermissionError): return pe
pe = pe.__cause__ or pe.__context__
return e</code><note id="_9713d9e3">Some libs (like `httpcore`) wrap our permission errors, so we can't handle them directly. We unwrap the error in that situation.</note><code id="_5f44108c" export>class RunPython:
"""Execute Python with audit-hook safety checks and access to LLM tools, returning last expression.
`import` works in the usual way. All builtins are available.
Multiline code blocks can be used, including defining functions and variables.
**NB**: Locals are exported back to the caller's namespace."""
def __init__(self, g=None, sentinel=None, ok_dests=UNSET, ban_imports=frozenset({'socket','importlib'}), ban_defs=True,
pre_deny=None, **kwargs):
if ok_dests is UNSET: ok_dests = default_ok_dests
if g is None:
try: ip = get_ipython()
except NameError: ip = None
g = getattr(ip, 'user_ns', None)
if g is None: g = _find_frame_dict(sentinel)
self.g = g
self.ok_dests = ok_dests or ()
self.ban_imports,self.ban_defs,self.pre_deny,self.kwargs = ban_imports,ban_defs,pre_deny,kwargs
async def __call__(self, code:str):
try:
_check_user_code(ast.parse(code), ban_imports=self.ban_imports, ban_defs=self.ban_defs)
return await _run_python(code, g=self.g, ok_dests=self.ok_dests, pre_deny=self.pre_deny, **self.kwargs)
except Exception as e:
e = _find_perm_err(e)
tb = e.__traceback__
while tb.tb_next and not tb.tb_frame.f_code.co_filename.startswith('<pyrun'): tb = tb.tb_next
raise e.with_traceback(tb) from None</code><note id="_f1b8972a">`RunPython` is the public API. It captures the caller's globals via `_find_frame_dict`, optionally takes `ok_dests` for write-checking, and executes code under the audit-hook policy.</note><code id="_d0bd7086">pyrun = RunPython()</code><code id="_d0e59c71">await pyrun('[]')</code><code id="_725dc98c">async def f(): return 1</code><code id="_8c093ad4">await pyrun('await f()')</code><note id="_305dae7c">Passing an empty dict as the namespace results in a clean environment:</note><code id="_2c31cd80">g = {}
await RunPython(g=g)('_ns_test_var = 42')
test_eq(list(g.keys()), ['_ns_test_var'])
assert '_ns_test_var' not in globals()</code><code id="_9105f690" export>@delegates(RunPython)
def create_pyrun_magic(shell=None, pyrun=None, **kwargs):
"Create magic"
if not shell: shell = get_ipython()
if not pyrun: pyrun = RunPython(**kwargs)
def f(line, cell=None):
if line=='-o': return pyrun
if not cell: return
return pyrun(cell)
shell.register_magic_function(f, 'line_cell', 'py')</code><code id="_5da01116">create_pyrun_magic()</code><code id="_8f1290cc">%%py
print('tt')</code><code id="_8210f966">%%py
type('t')</code><code id="_2fdebd30">%%py
a = 1
a+=2
a</code><note id="_d8c2039b">Unpacking is allowed:</note><code id="_2718662d">%%py
a = [1,2,3]
print(*a)</code><code id="_9f0b2705">def f(): warnings.warn('a warning')</code><code id="_9c90274e">%%py
print("asdf")
f()
1+1</code><note id="_e4242050">Classes and functions can not be created:</note><code id="_885bb2d7">with expect_fail(PermissionError):
await pyrun('class A:\n def __init__(self): print("hi")')
with expect_fail(PermissionError):
await pyrun('def f(): print("hi")')</code><code id="_7a7fdd13">with expect_fail(PermissionError): await pyrun('os.system("ls")')</code><code id="_4834cc35">%%py
print(os.unlink)
print(type(os.unlink))
print(os.unlink.__qualname__)</code><code id="_fad5d28c">class C: ...
c = C()</code><code id="_5e4565da">%%py
isinstance(C, type)</code><code id="_020634d9">%%py
isinstance(c, type)</code><note id="_8155a50d">### `allow_write_types`</note><code id="_de2cd76e">o = SimpleNamespace(x=1)
await pyrun('''
d = {}
d["x"] = 1
o.x = 2
d["x"],o.x''')</code><code id="_2324abe0" export>_path_wp = PathWritePolicy()
_dst1 = PosAllowPolicy(1, 'dst')
_rename_wp = PathWritePolicy(target_pos=0, target_kw='target')</code><note id="_afeb38c8">## Config</note><note id="_a33fbd3b">`safepyrun` loads an optional user config from `{xdg_config_home}/safepyrun/config.py` at import time, after all defaults are registered. This lets users permanently extend the sandbox allowlists without modifying the package. The config file is executed with all `safepyrun.core` globals already available — no imports needed. This includes `allow`, `allow_write_types`, `AllowPolicy`, `PathWritePolicy`, `PosAllowPolicy`, `OpenWritePolicy`, and all standard library modules already imported by the module.
Example `~/.config/safepyrun/config.py` (Linux) or `~/Library/Application Support/safepyrun/config.py` (macOS):
```python
# Add pandas tools
allow({pandas.DataFrame: ['head', 'describe', 'info', 'shape']})
```
If the config file has errors, a warning is emitted and the defaults remain intact.</note><code id="_e2c7e67d" export>_cfg_py = xdg_config_home() / 'safepyrun' / 'config.py'
if _cfg_py.exists():
try:
_cfg_ns = {k:v for k,v in globals().items() if not k.startswith('_')}
exec(_cfg_py.read_text(), _cfg_ns)
# TODO: Need to update this for the new set-data/allow API
if 'default_ok_dests' in _cfg_ns: globals()['default_ok_dests'] = _cfg_ns['default_ok_dests']
except Exception as e: warnings.warn(f"Failed to load {_cfg_py}: {e}")</code><note id="_5d8bbd88">## Examples</note><code id="_6debe6b5">%%py
a = {"b":1}
list(a.items())</code><code id="_ea89fdcd">%%py
Path().exists()</code><code id="_c59e4bbe">%%py
os.path.join('/foo', 'bar', 'baz.py')</code><code id="_84f1e246">%%py
a_=3</code><code id="_f863678c">#| eval: false
a_</code><code id="_51bc23b1">%%py
aa_='33'</code><code id="_8e9c3558">%%py
len(aa_)</code><code id="_a4990482">def g(): ...</code><code id="_131fa624">%%py
inspect.getsource(g)</code><code id="_4d8fc1d9">with expect_fail(PermissionError): await pyrun("os.unlink('/foo/bar')")</code><code id="_1be51d80">async def agen():
for x in [1,2]: yield x</code><code id="_2d974790">%%py
res = []
async for x in agen(): res.append(x)
res</code><code id="_64247c3c">import asyncio
async def fetch(n): return n * 10</code><code id="_654e0dbb">%%py
print(string.ascii_letters)
await asyncio.gather(fetch(1), fetch(2), fetch(3))</code><code id="_e5ac6304">import numpy as np</code><code id="_051a7ba4">%%py
np.array([1,2,3]).sum()</code><note id="_e9fcdbf1">### Allow policy examples</note><code id="_d7cb9b15">pyrun2 = RunPython(ok_dests=['/tmp'])</code><code id="_3a0e91e9">await pyrun2("Path('/tmp/test_write.txt').write_text('hello')")</code><code id="_6dc36d93">await pyrun2("open('/tmp/test_open.txt', 'w').write('hi')")</code><code id="_f426e9de">with expect_fail(PermissionError): await pyrun2("Path('/etc/evil.txt').write_text('bad')")
with expect_fail(PermissionError): await pyrun2("open('/root/bad.txt', 'w')")</code><code id="_19a9ba34">await pyrun2("open('/etc/passwd', 'r').read(10)")</code><code id="_5989bc35">await pyrun2("import shutil; shutil.copy('/tmp/test_write.txt', '/tmp/test_copy.txt')")</code><code id="_09cddc82">with expect_fail(PermissionError): await pyrun2("import shutil; shutil.copy('/tmp/test_write.txt', '/root/bad.txt')")
with expect_fail(PermissionError): await pyrun("Path('/tmp/test.txt').write_text('nope')")</code><code id="_0ee640d4">pyrun_cwd = RunPython(ok_dests=['.'])
# Writing to cwd should work
await pyrun_cwd("Path('test_cwd_ok.txt').write_text('hello')")</code><code id="_e9b0edea">with expect_fail(PermissionError): await pyrun("Path('test_cwd_ok.txt').write_text('hello')")</code><code id="_20b5a7f2">Path('test_cwd_ok.txt').unlink(missing_ok=True)</code><code id="_f00287f1"># Writing to /tmp should be blocked (not in ok_dests)
with expect_fail(PermissionError): await pyrun_cwd("Path('/tmp/nope.txt').write_text('bad')")</code><code id="_3f076b4d"># Parent traversal should be blocked
with expect_fail(PermissionError): await pyrun_cwd("Path('../escape.txt').write_text('bad')")</code><code id="_43e30169"># Sneaky traversal via subdir/../../ should also be blocked
with expect_fail(PermissionError): await pyrun_cwd("Path('subdir/../../escape.txt').write_text('bad')")</code><note id="_74d66403">## `allow`</note><code id="_929b33a5">def trusted_echo(): return subprocess.run(['echo', 'hi'], capture_output=True, text=True)</code><code id="_3285f58a">with expect_fail(PermissionError): await pyrun("trusted_echo().stdout")
with expect_fail(PermissionError): await pyrun("import subprocess; subprocess.run(['echo', 'hi'])")</code><code id="_81b73411">allow(trusted_echo)
test_eq((await pyrun("trusted_echo().stdout")), "hi\n")
with expect_fail(PermissionError): await pyrun("import subprocess; subprocess.run(['echo', 'hi'])")</code><code id="_7e941dbf">class _MethT:
def echo (self): return run('echo hi')
def echo2(self): return run('echo hi')
t = _MethT()
with expect_fail(PermissionError): await pyrun("t.echo()")
with expect_fail(PermissionError): await pyrun("t.echo2()")
allow(_MethT.echo)
test_eq(await pyrun("t.echo()"), "hi")
allow({_MethT:['echo2']})
test_eq(await pyrun("t.echo2()"), "hi")</code><code id="_35b7b510">@patch
def echo3(self:_MethT): return run('echo hi')
with expect_fail(PermissionError): await pyrun("t.echo3()")
allow({_MethT:['echo3']})
test_eq(await pyrun("t.echo3()"), "hi")</code><code id="_bb23ba62">import httpx</code><code id="_37b7d46d">with expect_fail(PermissionError): await pyrun('httpx.get("http://example.org")')</code><code id="_d0771c6b">@allow
def getexample(): return httpx.get('http://example.org')</code><code id="_9aa4cb66">await pyrun('getexample()')</code><code id="_1acf06e4">def testevent():
sys.audit('pyrun.testevent')
return 'ok'</code><code id="_006232b9">with expect_fail(PermissionError): await pyrun('testevent()')</code><code id="_5d797ec9">@allow
def testevent2():
return testevent()</code><code id="_076ec64a">%%py
testevent2()</code><code id="_5661ac4c">def chk_url(obj, args, kw, data):
url = args[0] if args else kw.get('url','')
if url not in data.get('ok_urls', ()): raise PermissionError(url)</code><code id="_c845aa94">pyrun_urls = RunPython(ok_urls={'http://example.org'})</code><code id="_4fc5d0b0">allow({httpx.get: chk_url})
with expect_fail(PermissionError): await pyrun_urls('httpx.get("http://example.com")')
await pyrun_urls('httpx.get("http://example.org")')</code><code id="_a49a4700">@allow
def _httpget(url): return httpx.get(url)</code><code id="_98e6824f">def httpget(url):
sys.audit('myapp.httpget', url)
return _httpget(url)</code><code id="_c1ace36b">def url_allow(info):
if info.event=='myapp.httpget' and info.raw.args[0] in info.data.get('ok_urls', ()): return True</code><code id="_0e736193">pyrun_urls2 = RunPython(pre_deny=url_allow, ok_urls={'http://example.org'})
with expect_fail(PermissionError): await pyrun_urls2('httpget("http://example.com")')
await pyrun_urls2('httpget("http://example.org")')</code><note id="_741cabd2">## Plots</note><code id="_92e10e61" export>def allow_matplotlib():
import matplotlib.pyplot as plt
from matplotlib.figure import Figure
from matplotlib.axes import Axes
from matplotlib.axis import Axis
from matplotlib.spines import Spine, SpinesProxy
from matplotlib.backend_bases import FigureCanvasBase
from matplotlib.font_manager import FontManager
from matplotlib.transforms import Bbox
allow(Bbox.update_from_path, FigureCanvasBase.__init__, FontManager.findfont)
allow({
plt: ['plot','figure','axis', 'subplots', 'show', 'tight_layout', 'subplot', 'bar', 'scatter', 'hist',
'xlabel', 'ylabel', 'title', 'legend', 'grid', 'xlim', 'ylim', 'colorbar', 'imshow'],
Figure: ['tight_layout', 'set_size_inches', 'add_subplot', 'suptitle'],
Axes: ['plot', 'bar', 'barh', 'scatter', 'hist', 'set_xlabel', 'set_ylabel', 'set_title',
'legend', 'grid', 'set_xlim', 'set_ylim', 'tick_params', 'set_xticks', 'set_yticks',
'annotate', 'text', 'axhline', 'axvline', 'fill_between', 'twinx', 'imshow', 'pie',
'boxplot', 'errorbar', 'stem', 'loglog', 'semilogx', 'semilogy', 'set_xscale', 'set_yscale'],
Axis: ['set_major_formatter', 'set_minor_formatter', 'set_major_locator', 'set_minor_locator'],
Spine: ['set_visible'], SpinesProxy: ['set_visible'],
})
_savefig_wp = PosAllowPolicy(0, 'fname')
allow({Figure: [('savefig', _savefig_wp)], plt: [('savefig', _savefig_wp)]})</code><code id="_28202dd0">import matplotlib.pyplot as plt
from matplotlib.figure import Figure
from matplotlib.axes import Axes
from matplotlib.axis import Axis
from matplotlib.spines import Spine, SpinesProxy</code><code id="_7fb4f5d1">allow_matplotlib()</code><code id="_c94ec416">%%py
fig, ax = plt.subplots(figsize=(3,2), dpi=100)
ax.plot([1,2,1], label='Triangle')
ax.plot([1,1.5,2], label='Rising')
ax.tick_params(labelsize=10)
ax.set_xlabel('X', fontsize=10)
ax.set_ylabel('Y', fontsize=10)
ax.spines[['top','right']].set_visible(False)
ax.yaxis.set_major_formatter('{x:.0f}')
ax.legend(fontsize=12);</code><code id="_7ea5e69c">#| eval: false
pyrun_tmp = RunPython(ok_dests=['/tmp'])
with expect_fail(PermissionError): await pyrun_tmp("fig.savefig(os.path.expanduser('~/plot.png'))")
await pyrun_tmp("fig.savefig('/tmp/plot.png')")</code><code id="_8d1cb417" export>def load_ipython_extension(ip):
ns = ip.user_ns
ns['pyrun'] = pyrun = RunPython(g=ns)
ns['allow'] = allow
create_pyrun_magic(ip, pyrun)</code><code id="_30964442" export>from fastcore.script import call_parse, Param</code><code id="_74c49b7c" export>@call_parse
def cli(path: Param("Path to script, or '-' for stdin", str, opt=False, nargs='?', default='-')):
"Run a python script file in the safepyrun sandbox"
try:
code = sys.stdin.read() if path == '-' else Path(path).read_text()
if (r := asyncio.run(RunPython()(code))) is not None: print(r)
except Exception as e: return not print(f"Error: {e}", file=sys.stderr)</code><note id="_9911c2a6">## export -</note><code id="_e8671c65">#| hide
from nbdev import nbdev_export
nbdev_export()</code></msgs>