Several supply chain attacks, notably in the Python and Javascript ecosystem, exploit install time hooks to perform malicious activity 1 2. Install time hooks allow running arbitray code before or after package installation. Since attacks utilizing install time hooks do not involve developers actually using the package, it makes them an attractive method for attackers. The most common behaviour observed in known supply chain attacks is data exflitration 1 2.. Common targets include ssh keys, passwords, dotfiles, environment variables etc.

A bit about how pip works

Python packages are distributed in two primary formats: wheel .whl and source .tar.gz. Many packages offer both formats, so pip prefers .whl artifacts over .tar.gz unless you specify --no-binary during pip install.

A package contains package metadata such as author, version, name, dependencies etc. There are two primary ways to declare package metadata. pyproject.toml and pip prefers pyproject.toml over as pyproject.toml is considered’s successor. Poetry, for example, offers both and pyproject.toml when running poetry build. In this case, pip would prioritize pyproject.toml.

If you’re distributing your package as a wheel, you cannot run install hooks 3. However, if you’re distributing it as a tarball, you can - given that you have a and not a pyproject.toml. Confusing, right? Don’t worry, just follow along!

Since is run during building and installation, it permits execution of arbitrary python code. There may be legitimate reasons to conduct network and or file system actions during or post installation, but since this is the most common attack vector, let’s explore how can we reduce the attack surface using audit hooks!

Test Drive

Let’s setup an audit hook to capture socket.connect and socket.getaddrinfo

 1# vim
 2import sys
 5def hook(event, args):
 6    if event == "socket.getaddrinfo":
 7        print("REQUESTING " + event + " "+ str(args[0])+":"+str(args[1]))
 8    elif event == "socket.connect":
 9        print("REQUESTING " + event + " "+ str(args[1][0])+":"+str(args[1][1]))
10    else:
11        return
12    data = input("y(es) or n(o): ")    
13    if data != "y":
14        sys.exit(1)
Let’s test it.

# run http sever
$ python3 -m http.server
Serving HTTP on port 8000 ( ...

1# vim
2import hook
3import requests
5r = requests.get("")
Let’s run

$ pip install requests
$ python3
REQUESTING socket.getaddrinfo
y(es) or n(o): y
REQUESTING socket.connect
y(es) or n(o): y

Your http server should show: - - [15/Jun/2023 14:33:56] "GET / HTTP/1.1" 200 -

Try writing n and notice that the server reports no requests.

Great! Now let’s introduce this hook into pip. We need to insert the audit hook before we run the package’s We also need to modify the audit hook slightly as is run as a subprocess.

  1# pip/_internal/utils/
  2import sys
  3import textwrap
  4from typing import List, Optional, Sequence
  7AUDIT_HOOK = textwrap.dedent("""'''
  8def hook(event, args):
  9    if event == "socket.getaddrinfo":
 10        sys.stdout.write("REQUESTING " + event + " "+ str(args[0])+":"+str(args[1]) + os.linesep)
 11        sys.stdout.flush()
 12    elif event == "socket.connect":
 13        sys.stdout.write("REQUESTING " + event + " "+ str(args[1][0])+":"+str(args[1][1]) + os.linesep)
 14        sys.stdout.flush()
 15        # TODO: filter anything in venv
 16    # we can filter for anything from here
 17    #
 18    else:
 19        return
 20    data = input()    
 21    if data != "y":
 22        sys.exit(1)
 26# Shim to wrap invocation with setuptools
 27# Note that __file__ is handled via two {!r} *and* %r, to ensure that paths on
 28# Windows are correctly handled (it should be "C:\\Users" not "C:\Users").
 29_SETUPTOOLS_SHIM = textwrap.dedent(
 30    """
 31    exec(compile('''
 32    # This is <pip-setuptools-caller> -- a caller that pip uses to run
 33    #
 34    # - It imports setuptools before invoking, to enable projects that directly
 35    #   import from `distutils.core` to work with newer packaging standards.
 36    # - It provides a clear error message when setuptools is not installed.
 37    # - It sets `sys.argv[0]` to the underlying ``, when invoking `` so
 38    #   setuptools doesn't think the script is `-c`. This avoids the following warning:
 39    #     manifest_maker: standard file '-c' not found".
 40    # - It generates a shim, for handling setup.cfg-only projects.
 41    import os, sys, tokenize
 42    try:
 43        import setuptools
 44    except ImportError as error:
 45        print(
 46            "ERROR: Can not execute `` since setuptools is not available in "
 47            "the build environment.",
 48            file=sys.stderr,
 49        )
 50        sys.exit(1)
 52    __file__ = %r
 53    sys.argv[0] = __file__
 55    if os.path.exists(__file__):
 56        filename = __file__
 57        with as f:
 58            setup_py_code =
 59    else:
 60        filename = "<auto-generated setuptools caller>"
 61        setup_py_code = "from setuptools import setup; setup()"
 62    # setup audit hooks here
 63    %s
 64    exec(compile(setup_py_code, filename, "exec"))
 65    ''' % ({!r}, {}), "<pip-setuptools-caller>", "exec"))
 66    """
 70def make_setuptools_shim_args(
 71    setup_py_path: str,
 72    global_options: Optional[Sequence[str]] = None,
 73    no_user_config: bool = False,
 74    unbuffered_output: bool = False,
 75) -> List[str]:
 76    """
 77    Get setuptools command arguments with shim wrapped setup file invocation.
 79    :param setup_py_path: The path to to be wrapped.
 80    :param global_options: Additional global options.
 81    :param no_user_config: If True, disables personal user configuration.
 82    :param unbuffered_output: If True, adds the unbuffered switch to the
 83     argument list.
 84    """
 85    args = [sys.executable]
 86    if unbuffered_output:
 87        args += ["-u"]
 88    args += ["-c", _SETUPTOOLS_SHIM.format(setup_py_path, AUDIT_HOOK)]
 89    if global_options:
 90        args += global_options
 91    if no_user_config:
 92        args += ["--no-user-cfg"]
 93    return args
 96def make_setuptools_bdist_wheel_args(
 97    setup_py_path: str,
 98    global_options: Sequence[str],
 99    build_options: Sequence[str],
100    destination_dir: str,
101) -> List[str]:
102    # NOTE: Eventually, we'd want to also -S to the flags here, when we're
103    # isolating. Currently, it breaks Python in virtualenvs, because it
104    # relies on to find parts of the standard library outside the
105    # virtualenv.
106    args = make_setuptools_shim_args(
107        setup_py_path, global_options=global_options, unbuffered_output=True
108    )
109    args += ["bdist_wheel", "-d", destination_dir]
110    args += build_options
111    return args
114def make_setuptools_clean_args(
115    setup_py_path: str,
116    global_options: Sequence[str],
117) -> List[str]:
118    args = make_setuptools_shim_args(
119        setup_py_path, global_options=global_options, unbuffered_output=True
120    )
121    args += ["clean", "--all"]
122    return args
125def make_setuptools_develop_args(
126    setup_py_path: str,
127    *,
128    global_options: Sequence[str],
129    no_user_config: bool,
130    prefix: Optional[str],
131    home: Optional[str],
132    use_user_site: bool,
133) -> List[str]:
134    assert not (use_user_site and prefix)
136    args = make_setuptools_shim_args(
137        setup_py_path,
138        global_options=global_options,
139        no_user_config=no_user_config,
140    )
142    args += ["develop", "--no-deps"]
144    if prefix:
145        args += ["--prefix", prefix]
146    if home is not None:
147        args += ["--install-dir", home]
149    if use_user_site:
150        args += ["--user", "--prefix="]
152    return args
155def make_setuptools_egg_info_args(
156    setup_py_path: str,
157    egg_info_dir: Optional[str],
158    no_user_config: bool,
159) -> List[str]:
160    args = make_setuptools_shim_args(setup_py_path, no_user_config=no_user_config)
162    args += ["egg_info"]
164    if egg_info_dir:
165        args += ["--egg-base", egg_info_dir]
167    return args

Since this code is called in a subprocess, we need to relay the user input (either y or n) from the main process to the subprocess.

  1# pip/_internal/utils/
  2import logging
  3import os
  4import shlex
  5import subprocess
  6from typing import (
  8    Any,
  9    Callable,
 10    Iterable,
 11    List,
 12    Mapping,
 13    Optional,
 14    Union,
 17from import escape
 19from pip._internal.cli.spinners import SpinnerInterface, open_spinner
 20from pip._internal.exceptions import InstallationSubprocessError
 21from pip._internal.utils.logging import VERBOSE, subprocess_logger
 22from pip._internal.utils.misc import HiddenText
 25    # Literal was introduced in Python 3.8.
 26    #
 27    # TODO: Remove `if TYPE_CHECKING` when dropping support for Python 3.7.
 28    from typing import Literal
 30CommandArgs = List[Union[str, HiddenText]]
 33def make_command(*args: Union[str, HiddenText, CommandArgs]) -> CommandArgs:
 34    """
 35    Create a CommandArgs object.
 36    """
 37    command_args: CommandArgs = []
 38    for arg in args:
 39        # Check for list instead of CommandArgs since CommandArgs is
 40        # only known during type-checking.
 41        if isinstance(arg, list):
 42            command_args.extend(arg)
 43        else:
 44            # Otherwise, arg is str or HiddenText.
 45            command_args.append(arg)
 47    return command_args
 50def format_command_args(args: Union[List[str], CommandArgs]) -> str:
 51    """
 52    Format command arguments for display.
 53    """
 54    # For HiddenText arguments, display the redacted form by calling str().
 55    # Also, we don't apply str() to arguments that aren't HiddenText since
 56    # this can trigger a UnicodeDecodeError in Python 2 if the argument
 57    # has type unicode and includes a non-ascii character.  (The type
 58    # checker doesn't ensure the annotations are correct in all cases.)
 59    return " ".join(
 60        shlex.quote(str(arg)) if isinstance(arg, HiddenText) else shlex.quote(arg)
 61        for arg in args
 62    )
 65def reveal_command_args(args: Union[List[str], CommandArgs]) -> List[str]:
 66    """
 67    Return the arguments in their raw, unredacted form.
 68    """
 69    return [arg.secret if isinstance(arg, HiddenText) else arg for arg in args]
 72def call_subprocess(
 73    cmd: Union[List[str], CommandArgs],
 74    show_stdout: bool = False,
 75    cwd: Optional[str] = None,
 76    on_returncode: 'Literal["raise", "warn", "ignore"]' = "raise",
 77    extra_ok_returncodes: Optional[Iterable[int]] = None,
 78    extra_environ: Optional[Mapping[str, Any]] = None,
 79    unset_environ: Optional[Iterable[str]] = None,
 80    spinner: Optional[SpinnerInterface] = None,
 81    log_failed_cmd: Optional[bool] = True,
 82    stdout_only: Optional[bool] = False,
 83    *,
 84    command_desc: str,
 85) -> str:
 86    """
 87    Args:
 88      show_stdout: if true, use INFO to log the subprocess's stderr and
 89        stdout streams.  Otherwise, use DEBUG.  Defaults to False.
 90      extra_ok_returncodes: an iterable of integer return codes that are
 91        acceptable, in addition to 0. Defaults to None, which means [].
 92      unset_environ: an iterable of environment variable names to unset
 93        prior to calling subprocess.Popen().
 94      log_failed_cmd: if false, failed commands are not logged, only raised.
 95      stdout_only: if true, return only stdout, else return both. When true,
 96        logging of both stdout and stderr occurs when the subprocess has
 97        terminated, else logging occurs as subprocess output is produced.
 98    """
 99    if extra_ok_returncodes is None:
100        extra_ok_returncodes = []
101    if unset_environ is None:
102        unset_environ = []
103    # Most places in pip use show_stdout=False. What this means is--
104    #
105    # - We connect the child's output (combined stderr and stdout) to a
106    #   single pipe, which we read.
107    # - We log this output to stderr at DEBUG level as it is received.
108    # - If DEBUG logging isn't enabled (e.g. if --verbose logging wasn't
109    #   requested), then we show a spinner so the user can still see the
110    #   subprocess is in progress.
111    # - If the subprocess exits with an error, we log the output to stderr
112    #   at ERROR level if it hasn't already been displayed to the console
113    #   (e.g. if --verbose logging wasn't enabled).  This way we don't log
114    #   the output to the console twice.
115    #
116    # If show_stdout=True, then the above is still done, but with DEBUG
117    # replaced by INFO.
118    if show_stdout:
119        # Then log the subprocess output at INFO level.
120        log_subprocess: Callable[..., None] =
121        used_level = logging.INFO
122    else:
123        # Then log the subprocess output using VERBOSE.  This also ensures
124        # it will be logged to the log file (aka user_log), if enabled.
125        log_subprocess = subprocess_logger.verbose
126        used_level = VERBOSE
128    # Whether the subprocess will be visible in the console.
129    showing_subprocess = subprocess_logger.getEffectiveLevel() <= used_level
131    # Only use the spinner if we're not showing the subprocess output
132    # and we have a spinner.
133    use_spinner = not showing_subprocess and spinner is not None
135    log_subprocess("Running command %s", command_desc)
136    env = os.environ.copy()
137    if extra_environ:
138        env.update(extra_environ)
139    for name in unset_environ:
140        env.pop(name, None)
141    try:
142        proc = subprocess.Popen(
143            # Convert HiddenText objects to the underlying str.
144            reveal_command_args(cmd),
145            stdin=subprocess.PIPE,
146            stdout=subprocess.PIPE,
147            stderr=subprocess.STDOUT if not stdout_only else subprocess.PIPE,
148            cwd=cwd,
149            env=env,
150            errors="backslashreplace",
151        )
152    except Exception as exc:
153        if log_failed_cmd:
154            subprocess_logger.critical(
155                "Error %s while executing command %s",
156                exc,
157                command_desc,
158            )
159        raise
160    all_output = []
161    if not stdout_only:
162        assert proc.stdout
163        assert proc.stdin
164        # In this mode, stdout and stderr are in the same pipe.
165        while True:
166            line: str = proc.stdout.readline()
167            if not line:
168                break
169            if 'REQUESTING' in line:
170                line_without_newline = line.replace('\n', '')
171                proc.stdout.flush()
172                data = input(f'Package is {line_without_newline}. (y)es or (n)o\n')
173                proc.stdin.write(data+'\n')
174                proc.stdin.flush()
175            else:
176                # Show the line immediately.
177                log_subprocess(line)
178            line = line.rstrip()
179            all_output.append(line + "\n")
181        try:
182            proc.wait()
183        finally:
184            if proc.stdout:
185                proc.stdout.close()
186            proc.stdin.close()
187        output = "".join(all_output)
188    else:
189        # In this mode, stdout and stderr are in different pipes.
190        # We must use communicate() which is the only safe way to read both.
191        out, err = proc.communicate()
192        # log line by line to preserve pip log indenting
193        for out_line in out.splitlines():
194            log_subprocess(out_line)
195        all_output.append(out)
196        for err_line in err.splitlines():
197            log_subprocess(err_line)
198        all_output.append(err)
199        output = out
201    proc_had_error = proc.returncode and proc.returncode not in extra_ok_returncodes
202    if use_spinner:
203        assert spinner
204        if proc_had_error:
205            spinner.finish("error")
206        else:
207            spinner.finish("done")
208    if proc_had_error:
209        if on_returncode == "raise":
210            error = InstallationSubprocessError(
211                command_description=command_desc,
212                exit_code=proc.returncode,
213                output_lines=all_output if not showing_subprocess else None,
214            )
215            if log_failed_cmd:
216                subprocess_logger.error("[present-rich] %s", error)
217                subprocess_logger.verbose(
218                    "[bold magenta]full command[/]: [blue]%s[/]",
219                    escape(format_command_args(cmd)),
220                    extra={"markup": True},
221                )
222                subprocess_logger.verbose(
223                    "[bold magenta]cwd[/]: %s",
224                    escape(cwd or "[inherit]"),
225                    extra={"markup": True},
226                )
228            raise error
229        elif on_returncode == "warn":
230            subprocess_logger.warning(
231                'Command "%s" had error code %s in %s',
232                command_desc,
233                proc.returncode,
234                cwd,
235            )
236        elif on_returncode == "ignore":
237            pass
238        else:
239            raise ValueError(f"Invalid value: on_returncode={on_returncode!r}")
240    return output
243def runner_with_spinner_message(message: str) -> Callable[..., None]:
244    """Provide a subprocess_runner that shows a spinner message.
246    Intended for use with for BuildBackendHookCaller. Thus, the runner has
247    an API that matches what's expected by BuildBackendHookCaller.subprocess_runner.
248    """
250    def runner(
251        cmd: List[str],
252        cwd: Optional[str] = None,
253        extra_environ: Optional[Mapping[str, Any]] = None,
254    ) -> None:
255        with open_spinner(message) as spinner:
256            call_subprocess(
257                cmd,
258                command_desc=message,
259                cwd=cwd,
260                extra_environ=extra_environ,
261                spinner=spinner,
262            )
264    return runner
And that’s it! It should prompt you everytime a socket connection is attempted during installation.

Testing our hook:

Let’s create a fake package to test it out:

$ mkdir test
$ vim test/
# test/
from setuptools import setup
import requests # you can use socket too
$ tar -czf test.tar.gz test
$ pip install test.tar.gz
Processing ./test.tar.gz
  Preparing metadata ( ...
Package is REQUESTING socket.getaddrinfo (y)es or (n)o

Another method:

$ vim test/
# test/
from setuptools import setup
from setuptools.command.install import install
from setuptools.command.develop import develop
import requests # you can use socket too

class AfterInstall(install):
    def run(self):

class AfterDevelop(develop):
    def run(self):

            'install': AfterInstall,
            'develop': AfterDevelop})
$ tar -czf test.tar.gz test
$ pip install test.tar.gz
Processing ./test.tar.gz
  Preparing metadata ( ...
Building wheels for collected packages: UNKNOWN
  Building wheel for UNKNOWN (
 ... Package is REQUESTING socket.getaddrinfo (y)es or (n)o

Yes! It works!

Additional checks:

 1def hook(event, args):
 2    if event == "socket.getaddrinfo":
 3        sys.stdout.write("REQUESTING " + event + " "+ str(args[0])+":"+str(args[1]) + os.linesep)
 4        sys.stdout.flush()
 5    elif event == "socket.connect":
 6        sys.stdout.write("REQUESTING " + event + " "+ str(args[1][0])+":"+str(args[1][1]) + os.linesep)
 7        sys.stdout.flush()
 8    elif event == "open":
 9        arg = str(args[0])
10        if ".ssh" in arg or "shadow" in arg or "passwd" in arg or ".config" in arg or '.env' in arg:
11          sys.stdout.write("REQUESTING "+ event +" "+ arg + os.linesep)
12          sys.stdout.flush()
13        else:
14          return
15    elif event == "os.system":
16        sys.stdout.write("REQUESTING: " + event+ " " + args[0].decode('utf-8') + os.linesep)
17        sys.stdout.flush()
18    elif event == "":
19        sys.stdout.write("REQUESTING: " + event+ " " + str(args[0]) + os.linesep)
20        sys.stdout.flush()
21    elif event == "":
22        sys.stdout.write("REQUESTING: " + event+ " " + str(args[0]) + os.linesep)
23        sys.stdout.flush()
24    elif event == "eval":
25        sys.stdout.write("REQUESTING: execution of arbitrary code" + os.linesep)
26        sys.stdout.flush()
27    else:
28        return
29    data = input()    
30    if data != "y":
31        sys.exit(1)
Note: It’s easier to set a blacklist than a whitelist for open as pip opens various files when building and installing.


Audit hooks are NOT foolproof and can be bypassed by an advanced adversary. See the PEP for more info. Despite that, this implementation provides a simple and effective defence against the abuse of install time hooks.

See the full implementation

Use it:

$ python3 -m venv venv
$ cd venv/lib/python3.<yourversion>/site-packages/
$ mv pip old_pip
$ wget
$ unzip
$ mv pip-main pip
$ cd pip
$ mv src/pip/* .
$ cd /to/our/test/package
$ pip install test.tar.gz
# it should work!

  1. Ohm et al. Backstabber’s Knife Collection: A Review of Open Source Software Supply Chain Attacks ↩︎ ↩︎

  2. Duan et al. Towards Measuring Supply Chain Attacks on Package Managers for Interpreted Languages ↩︎ ↩︎

  3. ↩︎