from fastcore.test import expect_fail,test_eq
from fastcore.test import test_eq, test_fail
import stringsafepython
Helpers and setup
_find_frame_dict walks the call stack looking for a frame whose globals contain sentinel. This lets RunPython find the caller’s namespace without requiring an explicit globals dict. If no sentinel is found, it falls back to its own module globals.
_test_sentinel = True
d = _find_frame_dict('_test_sentinel')
assert '_test_sentinel' in d
d2 = _find_frame_dict('nonexistent_sentinel_xyz')
assert d2 is not Nonefind_var
def find_var(
var:str
):
Search for var in all frames of the call stack
find_var('_test_sentinel')True
Builtins and wrappers
freeze_mon_policy
def freeze_mon_policy(
p
):
Freeze monitoring policy for one run
mon_disable_policy is mutable module-level configuration for cheap sys.monitoring DISABLE decisions. freeze_mon_policy snapshots it for a single run so policy checks are stable while code executes.
on_call
def on_call(
caller, callee, fn, code, off, data
):
Fast monitoring callback to decide if event should be DISABLEd
on_call uses only caller/callee names, call pairs, prefixes/suffixes, and optional predicates to decide whether this call site can be disabled for performance. Since fastaudit raises audit events for sys.monitoring c-call events, they can also be checked there when args/kwargs policy checks are needed.
frame_args
def frame_args(
fr, obj:NoneType=None
):
Introspection helper used before audit denies an op; check whether it happened inside an approved call
Check on a plain function, including normal positional arguments, keyword-only arguments, and extra **kwargs:
def some_tool(path, text, *, overwrite=False, **kwargs): return frame_args(currentframe())
args,kwargs = some_tool('path', 'text', overwrite=True, a='b')
test_eq(args, ['path', 'text'])
test_eq(kwargs, {'overwrite': True, 'a': 'b'})With methods removes self from the returned positional arguments:
class F:
def some_meth(self, path, text, *, overwrite=False, **kwargs): return frame_args(currentframe(), self)
args,kwargs = F().some_meth('path', 'text', overwrite=True, a='b')
test_eq(args, ['path', 'text'])
test_eq(kwargs, {'overwrite': True, 'a': 'b'})before_deny
def before_deny(
event, args, frame, msg, data, pre_deny:NoneType=None,
_frame_args:function=frame_args, # Bind [`frame_args`](https://AnswerDotAI.github.io/safepyrun/core.html#frame_args) locally to protect from overwriting
):
Check whether a possibly-denied audit event happened inside an approved call.
before_deny walks the stack looking for an approved pytool call. It either allows by function name directly, or calls a validator with the recovered arguments before allowing.
Check direct allow-by-name:
tst_data = dict(pytools={sys.modules[__name__]: {'_bd_allowed'}}, ok_dests=set())
def _bd_allowed(): return before_deny(None, None, currentframe(), None, data=tst_data)
def _bd_denied (): return before_deny(None, None, currentframe(), None, data=tst_data)
assert _bd_allowed()
assert not _bd_denied()Check the registered method passes its object, recovered args, kwargs, and allowed destinations:
_seen = []
def _bd_check(obj, args, kw, data): _seen.append((obj, args, kw, data['ok_dests']))
class _BDT:
def save(self, path, *, overwrite=False): return before_deny(None, None, currentframe(), None, data=tst_data)
tst_data = dict(pytools={_BDT: [('save', _bd_check)]}, ok_dests={'/tmp'})
t = _BDT()
test_eq(t.save('/tmp/x', overwrite=True), True)
test_eq(_seen, [(t, ['/tmp/x'], dict(overwrite=True), {'/tmp'})])Main implementation
srcfn
def srcfn(
src
):
Stores src in linecache under <pyrun_{i%10}>, returns the name.
srcfn(''),srcfn('')('<pyrun_0>', '<pyrun_1>')
__run_python
async def __run_python(
code:str, g:NoneType=None, ok_dests:NoneType=None
):
Call self as a function.
__run_python executes user code with separate globals (rg) and locals (loc) mappings. Earlier top-level assignments are written into loc. Most final expressions can still see those names because they are evaluated with the same locals mapping.
Generator expressions are different: they run in their own nested scope. When the generator body later resolves a name assigned by an earlier statement, it looks in the globals mapping (rg), not the outer eval locals mapping (loc). So code like x = 10; sum(i+x for i in [1,2]) failed with NameError because x was only in loc.
After executing the non-final statements, rg.update(loc) copies those assigned names into the globals mapping before evaluating the final expression. This preserves the notebook-style behavior where names defined earlier in the cell are visible inside generators, comprehensions, lambdas, and other nested scopes used by the final expression.
await __run_python(
"rnd_var_name_123 = 10\n"
"sum(x + rnd_var_name_123 for x in [1, 2])",
g={}
)23
_run_python builds the per-run policy bundle passed to fastaudit, including approved pytools, allowed destinations, and the frozen monitoring policy snapshot.
We have some extra rules based on the raw code being run (as opposed to the code inside the implementations being called): - No imports - No exec/eval - No defining functions or classes. _check_user_code is responsible for these rules.
Some libs (like httpcore) wrap our permission errors, so we can’t handle them directly. We unwrap the error in that situation.
RunPython
def RunPython(
g:NoneType=None, sentinel:NoneType=None, ok_dests:Unset=UNSET,
ban_imports:frozenset=frozenset({'socket', 'importlib'}), ban_defs:bool=True, pre_deny:NoneType=None,
kwargs:VAR_KEYWORD
):
Execute Python with audit-hook safety checks and access to LLM tools, returning last expression. import works in the usual way. All builtins are available. Multiline code blocks can be used, including defining functions and variables. NB: Locals are exported back to the caller’s namespace.
RunPython is the public API. It captures the caller’s globals via _find_frame_dict, optionally takes ok_dests for write-checking, and executes code under the audit-hook policy.
pyrun = RunPython()await pyrun('[]')[]
async def f(): return 1await pyrun('await f()')1
This test assures that the we can use an empty dict as the namespace and that it does not fall back to the global namespace as before.
g = {}
await RunPython(g=g)('_ns_test_var = 42')
test_eq(list(g.keys()), ['_ns_test_var'])
assert '_ns_test_var' not in globals()create_pyrun_magic
def create_pyrun_magic(
shell:NoneType=None, pyrun:NoneType=None, g:NoneType=None, sentinel:NoneType=None, ok_dests:Unset=UNSET,
ban_imports:frozenset=frozenset({'socket', 'importlib'}), ban_defs:bool=True, pre_deny:NoneType=None
):
Create magic
create_pyrun_magic()print('tt')tt
type('t')str
a = 1
a+=2
a3
Unpacking is allowed:
a = [1,2,3]
print(*a)1 2 3
def f(): warnings.warn('a warning')print("asdf")
f()
1+1asdf
/var/folders/51/b2_szf2945n072c0vj2cyty40000gn/T/ipykernel_89792/3833129470.py:1: UserWarning: a warning
def f(): warnings.warn('a warning')
2
Classes and functions can not be created:
with expect_fail(PermissionError):
await pyrun('class A:\n def __init__(self): print("hi")')
with expect_fail(PermissionError):
await pyrun('def f(): print("hi")')with expect_fail(PermissionError): await pyrun('os.system("ls")')print(os.unlink)
print(type(os.unlink))
print(os.unlink.__qualname__)<built-in function unlink>
<class 'builtin_function_or_method'>
unlink
class C: ...
c = C()isinstance(C, type)True
isinstance(c, type)False
allow_write_types
o = SimpleNamespace(x=1)
await pyrun('''
d = {}
d["x"] = 1
o.x = 2
d["x"],o.x''')(1, 2)
Config
safepyrun loads an optional user config from {xdg_config_home}/safepyrun/config.py at import time, after all defaults are registered. This lets users permanently extend the sandbox allowlists without modifying the package. The config file is executed with all safepyrun.core globals already available — no imports needed. This includes allow, allow_write_types, AllowPolicy, PathWritePolicy, PosAllowPolicy, OpenWritePolicy, and all standard library modules already imported by the module.
Example ~/.config/safepyrun/config.py (Linux) or ~/Library/Application Support/safepyrun/config.py (macOS):
# Add pandas tools
allow({pandas.DataFrame: ['head', 'describe', 'info', 'shape']})If the config file has errors, a warning is emitted and the defaults remain intact.
Examples
a = {"b":1}
list(a.items())[('b', 1)]
Path().exists()True
os.path.join('/foo', 'bar', 'baz.py')'/foo/bar/baz.py'
a_=3a_3
aa_='33'len(aa_)2
def g(): ...inspect.getsource(g)'def g(): ...\n'
with expect_fail(PermissionError): await pyrun("os.unlink('/foo/bar')")async def agen():
for x in [1,2]: yield xres = []
async for x in agen(): res.append(x)
res[1, 2]
import asyncio
async def fetch(n): return n * 10print(string.ascii_letters)
await asyncio.gather(fetch(1), fetch(2), fetch(3))abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ
[10, 20, 30]
import numpy as npnp.array([1,2,3]).sum()np.int64(6)
Allow policy examples
pyrun2 = RunPython(ok_dests=['/tmp'])await pyrun2("Path('/tmp/test_write.txt').write_text('hello')")5
await pyrun2("open('/tmp/test_open.txt', 'w').write('hi')")2
with expect_fail(PermissionError): await pyrun2("Path('/etc/evil.txt').write_text('bad')")
with expect_fail(PermissionError): await pyrun2("open('/root/bad.txt', 'w')")await pyrun2("open('/etc/passwd', 'r').read(10)")'##\n# User '
await pyrun2("import shutil; shutil.copy('/tmp/test_write.txt', '/tmp/test_copy.txt')")'/tmp/test_copy.txt'
with expect_fail(PermissionError): await pyrun2("import shutil; shutil.copy('/tmp/test_write.txt', '/root/bad.txt')")
with expect_fail(PermissionError): await pyrun("Path('/tmp/test.txt').write_text('nope')")pyrun_cwd = RunPython(ok_dests=['.'])
# Writing to cwd should work
await pyrun_cwd("Path('test_cwd_ok.txt').write_text('hello')")5
with expect_fail(PermissionError): await pyrun("Path('test_cwd_ok.txt').write_text('hello')")Path('test_cwd_ok.txt').unlink(missing_ok=True)# Writing to /tmp should be blocked (not in ok_dests)
with expect_fail(PermissionError): await pyrun_cwd("Path('/tmp/nope.txt').write_text('bad')")# Parent traversal should be blocked
with expect_fail(PermissionError): await pyrun_cwd("Path('../escape.txt').write_text('bad')")# Sneaky traversal via subdir/../../ should also be blocked
with expect_fail(PermissionError): await pyrun_cwd("Path('subdir/../../escape.txt').write_text('bad')")allow
def trusted_echo(): return subprocess.run(['echo', 'hi'], capture_output=True, text=True)with expect_fail(PermissionError): await pyrun("trusted_echo().stdout")
with expect_fail(PermissionError): await pyrun("import subprocess; subprocess.run(['echo', 'hi'])")allow(trusted_echo)
test_eq((await pyrun("trusted_echo().stdout")), "hi\n")
with expect_fail(PermissionError): await pyrun("import subprocess; subprocess.run(['echo', 'hi'])")class _MethT:
def echo (self): return run('echo hi')
def echo2(self): return run('echo hi')
t = _MethT()
with expect_fail(PermissionError): await pyrun("t.echo()")
with expect_fail(PermissionError): await pyrun("t.echo2()")
allow(_MethT.echo)
test_eq(await pyrun("t.echo()"), "hi")
allow({_MethT:['echo2']})
test_eq(await pyrun("t.echo2()"), "hi")@patch
def echo3(self:_MethT): return run('echo hi')
with expect_fail(PermissionError): await pyrun("t.echo3()")
allow({_MethT:['echo3']})
test_eq(await pyrun("t.echo3()"), "hi")import httpxwith expect_fail(PermissionError): await pyrun('httpx.get("http://example.org")')@allow
def getexample(): return httpx.get('http://example.org')await pyrun('getexample()')<Response [200 OK]>
def testevent():
sys.audit('pyrun.testevent')
return 'ok'with expect_fail(PermissionError): await pyrun('testevent()')@allow
def testevent2():
return testevent()testevent2()'ok'
def chk_url(obj, args, kw, data):
url = args[0] if args else kw.get('url','')
if url not in data.get('ok_urls', ()): raise PermissionError(url)pyrun_urls = RunPython(ok_urls={'http://example.org'})allow({httpx.get: chk_url})
with expect_fail(PermissionError): await pyrun_urls('httpx.get("http://example.com")')
await pyrun_urls('httpx.get("http://example.org")')<Response [200 OK]>
@allow
def _httpget(url): return httpx.get(url)def httpget(url):
sys.audit('myapp.httpget', url)
return _httpget(url)def url_allow(event, args, frame, msg, data, _frame_args):
if event=='myapp.httpget' and args[0] in data.get('ok_urls', ()): return Truepyrun_urls2 = RunPython(pre_deny=url_allow, ok_urls={'http://example.org'})
with expect_fail(PermissionError): await pyrun_urls2('httpget("http://example.com")')
await pyrun_urls2('httpget("http://example.org")')<Response [200 OK]>
Plots
allow_matplotlib
def allow_matplotlib(
):
Call self as a function.
import matplotlib.pyplot as plt
from matplotlib.figure import Figure
from matplotlib.axes import Axes
from matplotlib.axis import Axis
from matplotlib.spines import Spine, SpinesProxyallow_matplotlib()fig, ax = plt.subplots(figsize=(3,2), dpi=100)
ax.plot([1,2,1], label='Triangle')
ax.plot([1,1.5,2], label='Rising')
ax.tick_params(labelsize=10)
ax.set_xlabel('X', fontsize=10)
ax.set_ylabel('Y', fontsize=10)
ax.spines[['top','right']].set_visible(False)
ax.yaxis.set_major_formatter('{x:.0f}')
ax.legend(fontsize=12);
pyrun_tmp = RunPython(ok_dests=['/tmp'])
with expect_fail(PermissionError): await pyrun_tmp("fig.savefig(os.path.expanduser('~/plot.png'))")
await pyrun_tmp("fig.savefig('/tmp/plot.png')")load_ipython_extension
def load_ipython_extension(
ip
):
Call self as a function.
cli
def cli(
path:str <Path to script, or '-' for stdin>
):
Run a python script file in the safepyrun sandbox