Bashxtract API

Extract commands used from bash command lines

Introduction

safecmd.bashxtract provides tools for parsing and extracting commands from bash command strings. It’s designed for security-conscious applications where you need to understand exactly what commands a shell script will execute before running it.

The core use case is validating shell commands from untrusted sources (like LLM-generated commands) against an allowlist. Rather than trying to regex-match bash syntax—which is notoriously tricky—this module uses shfmt, a proper bash parser, to build an AST and then extracts all executable commands from it.

It’s likely that the only function you’ll actually need from here is extract_commands. But we provide a full API of all the pieces we use to build that function, which we’ll take you through here.

The Problem

Parsing bash commands is surprisingly tricky. You might think shlex.split would work:

cmd = '''
echo | head 2 <<EOF
asdf
jkljl
EOF
'''
shlex.split(cmd)
['echo', '|', 'head', '2', '<<EOF', 'asdf', 'jkljl', 'EOF']

But shlex doesn’t understand bash syntax—it treats |, <<EOF, and the heredoc content as regular arguments. It can’t tell us that echo and head are separate commands in a pipeline, or that the heredoc content is input, not an argument.

Fortunately, there’s a proper bash parser called shfmt that can parse bash into a JSON AST (Abstract Syntax Tree). The shfmt-py package (installed automatically with safecmd) provides the binary:

!shfmt --help 2>&1 | head -6
usage: shfmt [flags] [path ...]

shfmt formats shell programs. If the only argument is a dash ('-') or no
arguments are given, standard input will be used. If a given path is a
directory, all shell scripts found under that directory will be used.

…and this is the key flag that we will use:

!shfmt --help 2>&1 | grep to-json
  --to-json           print syntax tree to stdout as a typed JSON

As we’ll see below, the AST that shfmt creates represents the structure of the bash command as nested dictionaries. Each node has a Type field telling us what kind of construct it is—CallExpr for a command invocation, BinaryCmd for pipelines and logical operators, Word for arguments, and so on. From this, we can pull out just the information we need.

For our heredoc example, what we ultimately want is to extract:

  1. The commands: ['echo'] and ['head', '2']
  2. The operators used: {'|'} (a pipe)
  3. The heredoc content attached to the command that receives it

So our goal is to walk this AST and produce a simple list of commands with their arguments, plus a set of operators—something we can easily validate against an allowlist.

The module is structured in layers:

  1. Parsing layer (parse_bash): Converts bash syntax to a JSON AST using shfmt
  2. Text extraction (part_text, word_text): Reconstructs text values from AST nodes, handling quotes, escapes, and expansions
  3. AST walking (visit_stmts, nested_stmts): Recursively traverses the AST to find all commands, including those nested in substitutions
  4. Operator detection (collect_ops): Identifies shell operators like pipes, redirects, and logical operators
  5. Validation (check_types): Ensures we only process bash constructs we understand
  6. Main API (extract_commands): Combines everything into a simple interface

Parsing


source

parse_bash


def parse_bash(
    cmd:str, shfmt:str='shfmt'
):

Parse cmd using shfmt

Parses a bash command string using shfmt --to-json and returns the AST as a Python dict. Raises ValueError if the command has syntax errors. Requires the shfmt binary to be available.

parse_bash('echo hello')
{'Type': 'File',
 'Pos': {'Offset': 0, 'Line': 1, 'Col': 1},
 'End': {'Offset': 10, 'Line': 1, 'Col': 11},
 'Stmts': [{'Pos': {'Offset': 0, 'Line': 1, 'Col': 1},
   'End': {'Offset': 10, 'Line': 1, 'Col': 11},
   'Cmd': {'Type': 'CallExpr',
    'Pos': {'Offset': 0, 'Line': 1, 'Col': 1},
    'End': {'Offset': 10, 'Line': 1, 'Col': 11},
    'Args': [{'Pos': {'Offset': 0, 'Line': 1, 'Col': 1},
      'End': {'Offset': 4, 'Line': 1, 'Col': 5},
      'Parts': [{'Type': 'Lit',
        'Pos': {'Offset': 0, 'Line': 1, 'Col': 1},
        'End': {'Offset': 4, 'Line': 1, 'Col': 5},
        'ValuePos': {'Offset': 0, 'Line': 1, 'Col': 1},
        'ValueEnd': {'Offset': 4, 'Line': 1, 'Col': 5},
        'Value': 'echo'}]},
     {'Pos': {'Offset': 5, 'Line': 1, 'Col': 6},
      'End': {'Offset': 10, 'Line': 1, 'Col': 11},
      'Parts': [{'Type': 'Lit',
        'Pos': {'Offset': 5, 'Line': 1, 'Col': 6},
        'End': {'Offset': 10, 'Line': 1, 'Col': 11},
        'ValuePos': {'Offset': 5, 'Line': 1, 'Col': 6},
        'ValueEnd': {'Offset': 10, 'Line': 1, 'Col': 11},
        'Value': 'hello'}]}]},
   'Position': {'Offset': 0, 'Line': 1, 'Col': 1}}]}

Text Extraction


source

part_text


def part_text(
    p, cmd
):

Extracts the text value from a single word part node in the shfmt AST.

Handles literals (with backslash-space unescaping), single/double quoted strings, parameter expansions ($var, ${var}, ${arr[0]}), and command/process substitutions. For substitutions, returns the original source text using offset positions.

part_text({'Type': 'SglQuoted', 'Value': 'foo bar'}, "echo 'foo bar'")
'foo bar'

source

word_text


def word_text(
    w, cmd
):

Converts a Word node (with Parts) into its full text repr by concatenating part_text for each part.

word_text({'Parts': [{'Type': 'Lit', 'Value': 'hello'}]}, 'echo hello')
'hello'

source

nested_stmts


def nested_stmts(
    parts
):

Yield all Stmts lists from nested Parts recursively

Generator that recursively yields all Stmts lists found within nested Parts arrays. Used to find command substitutions ($(...)) and process substitutions (<(...)) at any nesting depth, including those inside double-quoted strings.

AST Walking

parts = [{'Type': 'CmdSubst', 'Stmts': [{'Cmd': {...}}]}]
list(nested_stmts(parts))
[[{'Cmd': {Ellipsis}}]]

source

visit_stmts


def visit_stmts(
    stmts, cmd, commands:NoneType=None
):

Visit statements, appending commands and handling redirects

Walks a list of statement nodes from the shfmt AST, extracting all commands (including nested ones) into the commands list. Each command is represented as [cmd, arg1, arg2, ...]. Handles redirects by appending heredoc/here-string content to the most recent command. Returns the commands list.

parsed = parse_bash('echo foo; cat file')
visit_stmts(parsed['Stmts'], 'echo foo; cat file')
[['echo', 'foo'], ['cat', 'file']]

source

collect_ops


def collect_ops(
    node, ops:NoneType=None
):

Walk AST node and collect all operators into a set

Detects &, ;, logical AND/OR (&&/||), pipe (|), and redirections (>, >>, <) by checking the Op field against OP_MAP and the Background/Semicolon boolean flags.

collect_ops(parse_bash('echo a && echo b | cat > out.txt'))
{'&&', '>', '|'}

source

collect_redirects


def collect_redirects(
    node, cmd, redirects:NoneType=None
):

Walk AST node and collect all write redirect destinations as (op, dest) tuples

collect_redirects extracts all write redirect destinations from the AST. For each output redirect (>, >>, &>, &>>, >&), it returns a tuple of (operator, destination). This is used to validate that commands only write to allowed destinations.

cmd = 'echo a && echo b | cat > out.txt'
collect_redirects(parse_bash(cmd), cmd)
[('>', 'out.txt')]
from fastcore.test import test_eq
def _redirects(cmd): return collect_redirects(parse_bash(cmd), cmd)

test_eq(_redirects('echo hi > out.txt'), [('>', 'out.txt')])
test_eq(_redirects('echo hi >> log.txt'), [('>>', 'log.txt')])
test_eq(_redirects('echo hi &> both.txt'), [('&>', 'both.txt')])
test_eq(_redirects('cat > a > b'), [('>', 'a'), ('>', 'b')])
test_eq(_redirects('echo hi | cat'), [])  # no redirects
test_eq(_redirects('echo hi < in.txt'), [])  # input redirect, not output
test_eq(_redirects('cmd > "$HOME/file"'), [('>', '$HOME/file')])  # variable in dest

source

scan_flag_args


def scan_flag_args(
    commands, exec_flags:NoneType=None, dest_flags:NoneType=None
):

Scan commands for exec/dest flags and extract their arguments

scan_flag_args scans a list of extracted commands for special flags that take command or destination arguments. It takes two dicts mapping command names to sets of flags:

  • exec_flags: flags whose next arg is a command to validate (e.g., {'find': {'-exec', '-execdir'}})
  • dest_flags: flags whose next arg is a destination to validate (e.g., {'curl': {'-o', '--output'}})

Returns (extra_cmds, extra_dests) where extra_cmds is a list of command strings to parse recursively, and extra_dests is a list of (flag, dest) tuples to validate as redirect destinations.

exec_flags = {'find': {'-exec', '-execdir'}}
dest_flags = {'curl': {'-o', '--output'}}

# Find with -exec extracts the command arg
scan_flag_args([['find', '.', '-exec', 'ls', '{}', ';']], exec_flags=exec_flags)
(['ls'], [])
# curl with -o extracts the destination
scan_flag_args([['curl', '-o', '/tmp/file', 'http://example.com']], dest_flags=dest_flags)
([], [('-o', '/tmp/file')])
# cat -o is NOT treated as a dest flag (not in dest_flags for cat)
scan_flag_args([['cat', '-o', '/etc/passwd']], dest_flags=dest_flags)
([], [])

source

check_types


def check_types(
    node
):

Raise ValueError if AST contains unhandled node types

Raises ValueError if any node has a Type not in HANDLED_TYPES. Use this to detect unsupported bash constructs early, ensuring the rest of the parsing pipeline won’t silently skip or mishandle unknown syntax.

check_types(parse_bash('echo hello'))
try: check_types(parse_bash('[[ -f foo ]]'))
except ValueError: print('Caught unhandled construct')
Caught unhandled construct

Main API


source

extract_commands


def extract_commands(
    cmd, shfmt:str='shfmt', exec_flags:NoneType=None, dest_flags:NoneType=None
):

Split bash command into (commands, operators, redirects)

extract_commands(cmd, exec_flags=None, dest_flags=None) parses a bash command string and returns a 3-tuple of: 1. A list of all commands that would be executed (including nested ones) 2. A set of operators used in the command 3. A list of write redirect destinations as (op, dest) tuples

Each command is represented as a list of tokens (strings), similar to the output of shlex.split(). The function uses shfmt to parse the bash syntax into an AST, then extracts all executable commands recursively.

Optional parameters: - exec_flags: dict mapping command names to sets of flags whose next arg is a command (e.g., {'find': {'-exec'}}). These are parsed recursively and added to commands. - dest_flags: dict mapping command names to sets of flags whose next arg is a destination (e.g., {'curl': {'-o'}}). These are added to redirects.

Handled constructs: - Simple commands: echo foo[['echo', 'foo']] - Pipelines: cat file | grep x[['cat', 'file'], ['grep', 'x']] - Sequences (;, &, &&, ||): echo a; echo b[['echo', 'a'], ['echo', 'b']] - Command substitution: echo $(whoami)[['echo', '$(whoami)'], ['whoami']] - Backtick substitution: echo `whoami`[['echo', 'whoami'], ['whoami']] - Process substitution: diff <(ls a) <(ls b)[['diff', ...], ['ls', 'a'], ['ls', 'b']] - Subshells: (cd /tmp && rm *)[['cd', '/tmp'], ['rm', '*']] - Nested substitutions are extracted recursively - Heredocs (<<EOF) and here-strings (<<<) have their content inlined as a single token - Quoted strings and escaped spaces are handled correctly, preserving them as single tokens - Output redirects: echo hi > file.txt → redirects: [('>', 'file.txt')] - Exec flags: find . -exec ls with exec_flags={'find': {'-exec'}} → adds ['ls'] to commands - Dest flags: curl -o /tmp/f url with dest_flags={'curl': {'-o'}} → adds ('-o', '/tmp/f') to redirects

The tests below show the full behavior:

from fastcore.test import test_eq
def test_split(a, *b, ops=set(), redirs=[]):
    test_eq(extract_commands(a), (list(b), ops, redirs))
test_split('echo <<EOF\nasdf\njkljl\nEOF\n', ['echo', 'asdf\njkljl'])
test_split('echo $(foo)', ['echo', '$(foo)'], ['foo'])
test_split('echo $(foo) | cat -a', ['echo', '$(foo)'], ['foo'], ['cat', '-a'], ops={'|'})
test_split('echo $(cat $(ls))', ['echo', '$(cat $(ls))'], ['cat', '$(ls)'], ['ls'])
test_split('echo "hello world" foo', ['echo', 'hello world', 'foo'])
test_split('echo hello\\ world', ['echo', 'hello world'])
test_split('echo foo; echo bar', ['echo', 'foo'], ['echo', 'bar'], ops={';'})
test_split('echo $HOME "${USER}"', ['echo', '$HOME', '${USER}'])
test_split('sleep 10 &', ['sleep', '10'], ops={';', '&'})
test_split('cat <<< "some text"', ['cat', '<<<', 'some text'])
test_split("echo \"it's a 'test'\"", ['echo', "it's a 'test'"])
test_split('echo "hello $(whoami) there"', ['echo', 'hello $(whoami) there'], ['whoami'])
test_split('echo "path is ${HOME}/bin"', ['echo', 'path is ${HOME}/bin'])
test_split('echo ${arr[0]}', ['echo', '${arr[0]}'])
test_split('echo "$(echo "inner")"', ['echo', '$(echo "inner")'], ['echo', 'inner'])
test_split('echo "$HOME/$(whoami)/file"', ['echo', '$HOME/$(whoami)/file'], ['whoami'])
test_split('echo `whoami`', ['echo', '`whoami`'], ['whoami'])
test_split('(cd /tmp && rm -rf *)', ['cd', '/tmp'], ['rm', '-rf', '*'], ops={'&&'})
test_split('eval "rm -rf /"', ['eval', 'rm -rf /'])
test_split('echo a && echo b || echo c', ['echo', 'a'], ['echo', 'b'], ['echo', 'c'], ops={'&&', '||'})
test_split('cat file > out', ['cat', 'file'], ops={'>'}, redirs=[('>', 'out')])
test_split('cat file >> out', ['cat', 'file'], ops={'>>'}, redirs=[('>>', 'out')])
test_split('cat < in', ['cat'], ops={'<'})
test_split('diff <(ls dir1) <(ls dir2)',
    ['diff', '<(ls dir1)', '<(ls dir2)'], ['ls', 'dir1'], ['ls', 'dir2'])
test_split('FOO=bar', ops={'='})
test_split('FOO=bar echo hello', ['echo', 'hello'], ops={'='})
test_split('echo &>file', ['echo'], ops={'&>'}, redirs=[('&>', 'file')])
test_split('echo &>>file', ['echo'], ops={'&>>'}, redirs=[('&>>', 'file')])
test_split('echo |& cat', ['echo'], ['cat'], ops={'|&'})

# fd duplication - not file redirects, so no redirs
test_split('echo >&2', ['echo'], ops={'>&'})
test_split('cat <&3', ['cat'], ops={'<&'})
exec_flags = {'find': {'-exec', '-execdir'}, 'tar': {'--to-command', '-I'}}
dest_flags = {'curl': {'-o', '--output'}}

def test_split_flags(a, *b, ops=set(), redirs=[], exec_f=exec_flags, dest_f=dest_flags):
    test_eq(extract_commands(a, exec_flags=exec_f, dest_flags=dest_f), (list(b), ops, redirs))
# find -exec extracts and validates the command
test_split_flags('find . -exec ls', ['find', '.', '-exec', 'ls'], ['ls'])
test_split_flags(r'find . -exec rm -rf {} \;', ['find', '.', '-exec', 'rm', '-rf', '{}', r'\;'], ['rm'])
test_split_flags(r'find . -execdir cat {} \;', ['find', '.', '-execdir', 'cat', '{}', r'\;'], ['cat'])

# curl -o extracts the destination
test_split_flags('curl -o /tmp/out http://x', ['curl', '-o', '/tmp/out', 'http://x'], redirs=[('-o', '/tmp/out')])
test_split_flags('curl --output file.txt http://x', ['curl', '--output', 'file.txt', 'http://x'], redirs=[('--output', 'file.txt')])

# cat -o is NOT a dest flag (cat not in dest_flags)
test_split_flags('cat -o /etc/passwd', ['cat', '-o', '/etc/passwd'])

# Nested: find -exec with a pipeline inside
test_split_flags('find . -exec "ls | head"', ['find', '.', '-exec', 'ls | head'], ['ls'], ['head'], ops={'|'})