_test_sentinel = True
d = _find_frame_dict('_test_sentinel')
assert '_test_sentinel' in d
d2 = _find_frame_dict('nonexistent_sentinel_xyz')
assert d2 is not Nonesafepython
Helpers and setup
_find_frame_dict walks the call stack looking for a frame whose globals contain sentinel. This lets RunPython find the caller’s namespace without requiring an explicit globals dict. If no sentinel is found, it falls back to its own module globals.
find_var
def find_var(
var:str
):
Search for var in all frames of the call stack
find_var('_test_sentinel')True
allow
def allow(
c:VAR_POSITIONAL
):
__pytools__ is the set of callable names that the sandbox allows. allow registers new entries — it accepts bare strings (for module-qualified names like 'numpy.array') or dicts mapping a class/module to a list of method names (which generates 'ClassName.method' keys).
assert 'pyrun' in __pytools__
allow('my_test_func')
assert 'my_test_func' in __pytools__
allow({str: ['zfill']})
assert 'str.zfill' in __pytools__
__pytools__.discard('my_test_func')
assert 'str.zfill' in __pytools__Write policies
chk_dest
def chk_dest(
p, ok_dests
):
chk_dest resolves a path and verifies it falls under one of the allowed destination prefixes. Raises PermissionError if not. Used by all WritePolicy subclasses.
chk_dest('/tmp/foo.txt', ['/tmp'])
try: chk_dest('/etc/passwd', ['/tmp'])
except PermissionError: print("Correctly blocked /etc/passwd")Correctly blocked /etc/passwd
OpenWritePolicy
def OpenWritePolicy(
args:VAR_POSITIONAL, kwargs:VAR_KEYWORD
):
Check open() only when mode is writable
PathWritePolicy
def PathWritePolicy(
target_pos:NoneType=None, target_kw:NoneType=None
):
Check resolved Path self, optionally also target args
PosWritePolicy
def PosWritePolicy(
pos:int=0, kw:NoneType=None
):
Check positional/keyword arg is an allowed write destination
WritePolicy
def WritePolicy(
args:VAR_POSITIONAL, kwargs:VAR_KEYWORD
):
Base for write destination policies
Three WritePolicy subclasses handle different write-checking patterns.
pp = PosWritePolicy(1, 'dst')
pp.check(None, ['src', '/tmp/ok'], {}, ['/tmp'])
try: pp.check(None, ['src', '/root/bad'], {}, ['/tmp'])
except PermissionError: print("PosWritePolicy blocked /root/bad")
pwp = PathWritePolicy()
pwp.check(Path('/tmp/f.txt'), [], {}, ['/tmp'])
try: pwp.check(Path('/etc/f.txt'), [], {}, ['/tmp'])
except PermissionError: print("PathWritePolicy blocked /etc/f.txt")
owp = OpenWritePolicy()
owp.check(None, ['/tmp/f.txt', 'w'], {}, ['/tmp'])
owp.check(None, ['/etc/passwd', 'r'], {}, ['/tmp'])
try: owp.check(None, ['/root/f.txt', 'w'], {}, ['/tmp'])
except PermissionError: print("OpenWritePolicy blocked write to /root/f.txt")PosWritePolicy blocked /root/bad
PathWritePolicy blocked /etc/f.txt
OpenWritePolicy blocked write to /root/f.txt
allow_write
def allow_write(
policies
):
Register write policies for method/function names
__pytools_write__ maps qualified callable names (like 'Path.write_text') to WritePolicy objects. allow_write registers these policies. When ok_dests is set, _safe_getattr checks this registry and wraps matching callables with _WriteChecked to enforce destination validation before the call.
allow_write({'test.Method': WritePolicy()})
assert 'test.Method' in __pytools_write__
del __pytools_write__['test.Method']_WriteChecked wraps a method so that its WritePolicy is enforced before the actual call. Returned by _safe_getattr when a callable matches a __pytools_write__ entry and ok_dests is set.
wc = _WriteChecked(Path('/tmp'), Path.exists, PathWritePolicy(), ['/tmp'])
assert callable(wc)
wc2 = _WriteChecked(Path('/etc'), Path('/etc').exists, PathWritePolicy(), ['/tmp'])
try: wc2()
except PermissionError: print("WriteChecked correctly blocked /etc")WriteChecked correctly blocked /etc
_safe_open returns a closure that checks OpenWritePolicy before delegating to the real open. Only injected into the sandbox builtins when ok_dests is set — otherwise the default open (which is already excluded from all_builtins) is not available.
so = _safe_open(['/tmp'])
f = so('/tmp/test_safe_open.txt', 'w')
f.write('test'); f.close()
so('/etc/passwd', 'r').close()
try: so('/etc/bad.txt', 'w')
except PermissionError: print("_safe_open correctly blocked write to /etc")_safe_open correctly blocked write to /etc
Builtins and wrappers
all_builtins merges RestrictedPython’s safe_builtins, utility_builtins, limited_builtins, and async support, then adds the core container types (dict, list, set, tuple, frozenset) and __import__. This is the builtins dict passed to the sandbox — anything not in here is inaccessible as a builtin.
assert all_builtins['dict'] is dict
assert all_builtins['list'] is list
assert '__import__' in all_builtins
assert 'eval' not in all_builtins
assert 'exec' not in all_builtins
list(all_builtins.keys())[:5]['__build_class__', 'None', 'False', 'True', 'abs']
_make_safe_getattr returns a closure over ok_dests that intercepts every attribute access. For callables, it checks __pytools_write__ first (wrapping with _WriteChecked if matched), then falls back to the __llmtools__|__pytools__ allow-set. Non-callables pass through unchecked.
ga = _make_safe_getattr(ok_dests=['/tmp'])
assert ga('hello', 'zfill')(10) == '00000hello'_DirectPrint is a no-op wrapper that RestrictedPython’s _print_ and _print hooks delegate to. It simply calls the real print, bypassing RestrictedPython’s default print interception.
_Uncallable wraps a callable to raise PermissionError on call, while still exposing its non-callable attributes (like __name__). This lets the sandbox expose objects for inspection without letting users invoke them.
_callable_ok checks whether a callable should be allowed — it’s ok if its name ends with _ (user-exported), is in the allow-set directly, or its module.qualname is registered.
uc = _Uncallable(len, 'len')
assert repr(uc) == repr(len)
try: uc([1,2,3])
except PermissionError: print("_Uncallable correctly blocked call to len")
_ok = {'test_func_'}
assert _callable_ok('test_func_', lambda: None, _ok)
assert not _callable_ok('secret', lambda: None, _ok)
assert not _callable_ok('_private', lambda: None, _ok)_Uncallable correctly blocked call to len
SafeTransformer
def SafeTransformer(
errors:NoneType=None, warnings:NoneType=None, used_names:NoneType=None
):
A :class:NodeVisitor subclass that walks the abstract syntax tree and allows modification of nodes.
The NodeTransformer will walk the AST and use the return value of the visitor methods to replace or remove the old node. If the return value of the visitor method is None, the node will be removed from its location, otherwise it is replaced with the return value. The return value may be the original node in which case no replacement takes place.
Here is an example transformer that rewrites all occurrences of name lookups (foo) to data['foo']::
class RewriteName(NodeTransformer):
def visit_Name(self, node):
return Subscript(
value=Name(id='data', ctx=Load()),
slice=Constant(value=node.id),
ctx=node.ctx
)
Keep in mind that if the node you’re operating on has child nodes you must either transform the child nodes yourself or call the :meth:generic_visit method for the node first.
For nodes that were part of a collection of statements (that applies to all statement nodes), the visitor may also return a list of nodes rather than just a single node.
Usually you use the transformer like this::
node = YourTransformer().visit(node)
SafeTransformer extends RestrictedPython’s RestrictingNodeTransformer to rewrite attribute access. Loads become _getattr_(obj, name) calls (enabling callable checks), stores/deletes become _write_(obj).attr = val (enabling mutation control). Private attrs (starting with _) are blocked except for a curated ALLOWED_DUNDERS set.
Main implementation
_run_python is the core sandbox executor. It compiles code with SafeTransformer, sets up the restricted globals (builtins, getattr hook, tools), handles the last-expression-as-return-value pattern, captures stdout/stderr, and exports _-suffixed locals back to the caller’s namespace.
RunPython
def RunPython(
g:NoneType=None, sentinel:NoneType=None, ok_dests:NoneType=None
):
Initialize self. See help(type(self)) for accurate signature.
RunPython is the public API. It captures the caller’s globals via _find_frame_dict, optionally takes ok_dests for write-checking, and generates its docstring dynamically from the current __llmtools__|__pytools__ set so the LLM always sees an up-to-date tool list.
pyrun = RunPython()await pyrun('[]')[]
await pyrun("print('tt')")'tt\n'
await pyrun("print('tt')", concise=False){'stdout': 'tt\n'}
# Unpacking is allowed
await pyrun("""
a = [1,2,3]
print(*a)
""")'1 2 3\n'
def f(): warnings.warn('a warning')
allow('f')
await pyrun('print("asdf"); f(); 1+1'){'stdout': 'asdf\n',
'stderr': "/var/folders/51/b2_szf2945n072c0vj2cyty40000gn/T/ipykernel_24286/3774884187.py:1: UserWarning: a warning\n def f(): warnings.warn('a warning')\n",
'result': 2}
Standard allows
safe_type
def safe_type(
o:object
):
Same as type(o)
Config
safepyrun loads an optional user config from {xdg_config_home}/safepyrun/config.py at import time, after all defaults are registered. This lets users permanently extend the sandbox allowlists without modifying the package. The config file is executed with all safepyrun.core globals already available — no imports needed. This includes allow, allow_write, WritePolicy, PathWritePolicy, PosWritePolicy, OpenWritePolicy, and all standard library modules already imported by the module.
Example ~/.config/safepyrun/config.py (Linux) or ~/Library/Application Support/safepyrun/config.py (macOS):
# Add pandas tools
allow({pandas.DataFrame: ['head', 'describe', 'info', 'shape']})
# Allow writing to ~/data
allow_write({'Path.write_text': PathWritePolicy()})If the config file has errors, a warning is emitted and the defaults remain intact.
Examples
await pyrun('''
a = {"b":1}
list(a.items())
''')[('b', 1)]
await pyrun('Path().exists()')True
await pyrun("os.path.join('/foo', 'bar', 'baz.py')")'/foo/bar/baz.py'
await pyrun('a_=3')
a_3
await pyrun('''aa_='33' ''')
await pyrun('''len(aa_) ''')2
def g(): ...await pyrun('inspect.getsource(g)')'def g(): ...\n'
try: await pyrun('g()')
except PermissionError: print("Correct exception raised")
else: raise Exception("No exception")Correct exception raised
await pyrun('re.compile("a")')re.compile(r'a', re.UNICODE)
from re import compileawait pyrun('compile("a")')re.compile(r'a', re.UNICODE)
await pyrun('''
dict(a=safe_type(1))
'''){'a': int}
await pyrun("""
async def agen():
for x in [1,2]: yield x
res = []
async for x in agen(): res.append(x)
res
""")[1, 2]
await pyrun('''
import asyncio
async def fetch(n): return n * 10
print(string.ascii_letters)
await asyncio.gather(fetch(1), fetch(2), fetch(3))
'''){'stdout': 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ\n',
'result': [10, 20, 30]}
import numpy as npallow('numpy.array', 'numpy.ndarray.sum')
await pyrun('import numpy as np; np.array([1,2,3]).sum()')6
Write policy examples
pyrun2 = RunPython(ok_dests=['/tmp'])await pyrun2("Path('/tmp/test_write.txt').write_text('hello')")5
try: await pyrun2("Path('/etc/evil.txt').write_text('bad')")
except PermissionError as e: print(f'Blocked: {e}')Blocked: Write to '/etc/evil.txt' not allowed; permitted: ['/tmp']
await pyrun2("open('/tmp/test_open.txt', 'w').write('hi')")2
try: await pyrun2("open('/root/bad.txt', 'w')")
except PermissionError as e: print(f'Blocked: {e}')Blocked: Write to '/root/bad.txt' not allowed; permitted: ['/tmp']
await pyrun2("open('/etc/passwd', 'r').read(10)")'##\n# User '
await pyrun2("import shutil; shutil.copy('/tmp/test_write.txt', '/tmp/test_copy.txt')")'/tmp/test_copy.txt'
try: await pyrun2("import shutil; shutil.copy('/tmp/test_write.txt', '/root/bad.txt')")
except PermissionError as e: print(f'Blocked: {e}')Blocked: Write to '/root/bad.txt' not allowed; permitted: ['/tmp']
try: await pyrun("Path('/tmp/test.txt').write_text('nope')")
except AttributeError as e: print(f'No ok_dests: {e}')No ok_dests: Cannot access callable: write_text
pyrun_cwd = RunPython(ok_dests=['.'])
# Writing to cwd should work
await pyrun_cwd("Path('test_cwd_ok.txt').write_text('hello')")5
Path('test_cwd_ok.txt').unlink(missing_ok=True)# Writing to /tmp should be blocked (not in ok_dests)
try: await pyrun_cwd("Path('/tmp/nope.txt').write_text('bad')")
except PermissionError: print("Blocked /tmp as expected")Blocked /tmp as expected
# Parent traversal should be blocked
try: await pyrun_cwd("Path('../escape.txt').write_text('bad')")
except PermissionError: print("Blocked ../ as expected")Blocked ../ as expected
# Sneaky traversal via subdir/../../ should also be blocked
try: await pyrun_cwd("Path('subdir/../../escape.txt').write_text('bad')")
except PermissionError: print("Blocked subdir/../../ as expected")Blocked subdir/../../ as expected
try: await pyrun_cwd("open('../bad_open.txt', 'w')")
except PermissionError: print("Blocked open ../ as expected")Blocked open ../ as expected