Skip to content
import json
import logging
from optparse import Values
from typing import TYPE_CHECKING, Generator, List, Optional, Sequence, Tuple, cast
from pip._vendor.packaging.utils import canonicalize_name
from pip._internal.cli import cmdoptions
from pip._internal.cli.req_command import IndexGroupCommand
from pip._internal.cli.status_codes import SUCCESS
from pip._internal.exceptions import CommandError
from pip._internal.index.collector import LinkCollector
from pip._internal.index.package_finder import PackageFinder
from pip._internal.metadata import BaseDistribution, get_environment
from pip._internal.models.selection_prefs import SelectionPreferences
from pip._internal.network.session import PipSession
from pip._internal.utils.compat import stdlib_pkgs
from pip._internal.utils.misc import tabulate, write_output
if TYPE_CHECKING:
from pip._internal.metadata.base import DistributionVersion
class _DistWithLatestInfo(BaseDistribution):
"""Give the distribution object a couple of extra fields.
These will be populated during ``get_outdated()``. This is dirty but
makes the rest of the code much cleaner.
"""
latest_version: DistributionVersion
latest_filetype: str
_ProcessedDists = Sequence[_DistWithLatestInfo]
logger = logging.getLogger(__name__)
class ListCommand(IndexGroupCommand):
"""
List installed packages, including editables.
Packages are listed in a case-insensitive sorted order.
"""
ignore_require_venv = True
usage = """
%prog [options]"""
def add_options(self) -> None:
self.cmd_opts.add_option(
"-o",
"--outdated",
action="store_true",
default=False,
help="List outdated packages",
)
self.cmd_opts.add_option(
"-u",
"--uptodate",
action="store_true",
default=False,
help="List uptodate packages",
)
self.cmd_opts.add_option(
"-e",
"--editable",
action="store_true",
default=False,
help="List editable projects.",
)
self.cmd_opts.add_option(
"-l",
"--local",
action="store_true",
default=False,
help=(
"If in a virtualenv that has global access, do not list "
"globally-installed packages."
),
)
self.cmd_opts.add_option(
"--user",
dest="user",
action="store_true",
default=False,
help="Only output packages installed in user-site.",
)
self.cmd_opts.add_option(cmdoptions.list_path())
self.cmd_opts.add_option(
"--pre",
action="store_true",
default=False,
help=(
"Include pre-release and development versions. By default, "
"pip only finds stable versions."
),
)
self.cmd_opts.add_option(
"--format",
action="store",
dest="list_format",
default="columns",
choices=("columns", "freeze", "json"),
help="Select the output format among: columns (default), freeze, or json",
)
self.cmd_opts.add_option(
"--not-required",
action="store_true",
dest="not_required",
help="List packages that are not dependencies of installed packages.",
)
self.cmd_opts.add_option(
"--exclude-editable",
action="store_false",
dest="include_editable",
help="Exclude editable package from output.",
)
self.cmd_opts.add_option(
"--include-editable",
action="store_true",
dest="include_editable",
help="Include editable package from output.",
default=True,
)
self.cmd_opts.add_option(cmdoptions.list_exclude())
index_opts = cmdoptions.make_option_group(cmdoptions.index_group, self.parser)
self.parser.insert_option_group(0, index_opts)
self.parser.insert_option_group(0, self.cmd_opts)
def _build_package_finder(
self, options: Values, session: PipSession
) -> PackageFinder:
"""
Create a package finder appropriate to this list command.
"""
link_collector = LinkCollector.create(session, options=options)
# Pass allow_yanked=False to ignore yanked versions.
selection_prefs = SelectionPreferences(
allow_yanked=False,
allow_all_prereleases=options.pre,
)
return PackageFinder.create(
link_collector=link_collector,
selection_prefs=selection_prefs,
)
def run(self, options: Values, args: List[str]) -> int:
if options.outdated and options.uptodate:
raise CommandError("Options --outdated and --uptodate cannot be combined.")
if options.outdated and options.list_format == "freeze":
raise CommandError(
"List format 'freeze' can not be used with the --outdated option."
)
cmdoptions.check_list_path_option(options)
skip = set(stdlib_pkgs)
if options.excludes:
skip.update(canonicalize_name(n) for n in options.excludes)
packages: "_ProcessedDists" = [
cast("_DistWithLatestInfo", d)
for d in get_environment(options.path).iter_installed_distributions(
local_only=options.local,
user_only=options.user,
editables_only=options.editable,
include_editables=options.include_editable,
skip=skip,
)
]
# get_not_required must be called firstly in order to find and
# filter out all dependencies correctly. Otherwise a package
# can't be identified as requirement because some parent packages
# could be filtered out before.
if options.not_required:
packages = self.get_not_required(packages, options)
if options.outdated:
packages = self.get_outdated(packages, options)
elif options.uptodate:
packages = self.get_uptodate(packages, options)
self.output_package_listing(packages, options)
return SUCCESS
def get_outdated(
self, packages: "_ProcessedDists", options: Values
) -> "_ProcessedDists":
return [
dist
for dist in self.iter_packages_latest_infos(packages, options)
if dist.latest_version > dist.version
]
def get_uptodate(
self, packages: "_ProcessedDists", options: Values
) -> "_ProcessedDists":
return [
dist
for dist in self.iter_packages_latest_infos(packages, options)
if dist.latest_version == dist.version
]
def get_not_required(
self, packages: "_ProcessedDists", options: Values
) -> "_ProcessedDists":
dep_keys = {
canonicalize_name(dep.name)
for dist in packages
for dep in (dist.iter_dependencies() or ())
}
# Create a set to remove duplicate packages, and cast it to a list
# to keep the return type consistent with get_outdated and
# get_uptodate
return list({pkg for pkg in packages if pkg.canonical_name not in dep_keys})
def iter_packages_latest_infos(
self, packages: "_ProcessedDists", options: Values
) -> Generator["_DistWithLatestInfo", None, None]:
with self._build_session(options) as session:
finder = self._build_package_finder(options, session)
def latest_info(
dist: "_DistWithLatestInfo",
) -> Optional["_DistWithLatestInfo"]:
all_candidates = finder.find_all_candidates(dist.canonical_name)
if not options.pre:
# Remove prereleases
all_candidates = [
candidate
for candidate in all_candidates
if not candidate.version.is_prerelease
]
evaluator = finder.make_candidate_evaluator(
project_name=dist.canonical_name,
)
best_candidate = evaluator.sort_best_candidate(all_candidates)
if best_candidate is None:
return None
remote_version = best_candidate.version
if best_candidate.link.is_wheel:
typ = "wheel"
else:
typ = "sdist"
dist.latest_version = remote_version
dist.latest_filetype = typ
return dist
for dist in map(latest_info, packages):
if dist is not None:
yield dist
def output_package_listing(
self, packages: "_ProcessedDists", options: Values
) -> None:
packages = sorted(
packages,
key=lambda dist: dist.canonical_name,
)
if options.list_format == "columns" and packages:
data, header = format_for_columns(packages, options)
self.output_package_listing_columns(data, header)
elif options.list_format == "freeze":
for dist in packages:
if options.verbose >= 1:
write_output(
"%s==%s (%s)", dist.raw_name, dist.version, dist.location
)
else:
write_output("%s==%s", dist.raw_name, dist.version)
elif options.list_format == "json":
write_output(format_for_json(packages, options))
def output_package_listing_columns(
self, data: List[List[str]], header: List[str]
) -> None:
# insert the header first: we need to know the size of column names
if len(data) > 0:
data.insert(0, header)
pkg_strings, sizes = tabulate(data)
# Create and add a separator.
if len(data) > 0:
pkg_strings.insert(1, " ".join(map(lambda x: "-" * x, sizes)))
for val in pkg_strings:
write_output(val)
def format_for_columns(
pkgs: "_ProcessedDists", options: Values
) -> Tuple[List[List[str]], List[str]]:
"""
Convert the package data into something usable
by output_package_listing_columns.
"""
header = ["Package", "Version"]
running_outdated = options.outdated
if running_outdated:
header.extend(["Latest", "Type"])
has_editables = any(x.editable for x in pkgs)
if has_editables:
header.append("Editable project location")
if options.verbose >= 1:
header.append("Location")
if options.verbose >= 1:
header.append("Installer")
data = []
for proj in pkgs:
# if we're working on the 'outdated' list, separate out the
# latest_version and type
row = [proj.raw_name, str(proj.version)]
if running_outdated:
row.append(str(proj.latest_version))
row.append(proj.latest_filetype)
if has_editables:
row.append(proj.editable_project_location or "")
if options.verbose >= 1:
row.append(proj.location or "")
if options.verbose >= 1:
row.append(proj.installer)
data.append(row)
return data, header
def format_for_json(packages: "_ProcessedDists", options: Values) -> str:
data = []
for dist in packages:
info = {
"name": dist.raw_name,
"version": str(dist.version),
}
if options.verbose >= 1:
info["location"] = dist.location or ""
info["installer"] = dist.installer
if options.outdated:
info["latest_version"] = str(dist.latest_version)
info["latest_filetype"] = dist.latest_filetype
editable_project_location = dist.editable_project_location
if editable_project_location:
info["editable_project_location"] = editable_project_location
data.append(info)
return json.dumps(data)
import logging
import shutil
import sys
import textwrap
import xmlrpc.client
from collections import OrderedDict
from optparse import Values
from typing import TYPE_CHECKING, Dict, List, Optional
from pip._vendor.packaging.version import parse as parse_version
from pip._internal.cli.base_command import Command
from pip._internal.cli.req_command import SessionCommandMixin
from pip._internal.cli.status_codes import NO_MATCHES_FOUND, SUCCESS
from pip._internal.exceptions import CommandError
from pip._internal.metadata import get_default_environment
from pip._internal.models.index import PyPI
from pip._internal.network.xmlrpc import PipXmlrpcTransport
from pip._internal.utils.logging import indent_log
from pip._internal.utils.misc import write_output
if TYPE_CHECKING:
from typing import TypedDict
class TransformedHit(TypedDict):
name: str
summary: str
versions: List[str]
logger = logging.getLogger(__name__)
class SearchCommand(Command, SessionCommandMixin):
"""Search for PyPI packages whose name or summary contains <query>."""
usage = """
%prog [options] <query>"""
ignore_require_venv = True
def add_options(self) -> None:
self.cmd_opts.add_option(
"-i",
"--index",
dest="index",
metavar="URL",
default=PyPI.pypi_url,
help="Base URL of Python Package Index (default %default)",
)
self.parser.insert_option_group(0, self.cmd_opts)
def run(self, options: Values, args: List[str]) -> int:
if not args:
raise CommandError("Missing required argument (search query).")
query = args
pypi_hits = self.search(query, options)
hits = transform_hits(pypi_hits)
terminal_width = None
if sys.stdout.isatty():
terminal_width = shutil.get_terminal_size()[0]
print_results(hits, terminal_width=terminal_width)
if pypi_hits:
return SUCCESS
return NO_MATCHES_FOUND
def search(self, query: List[str], options: Values) -> List[Dict[str, str]]:
index_url = options.index
session = self.get_default_session(options)
transport = PipXmlrpcTransport(index_url, session)
pypi = xmlrpc.client.ServerProxy(index_url, transport)
try:
hits = pypi.search({"name": query, "summary": query}, "or")
except xmlrpc.client.Fault as fault:
message = "XMLRPC request failed [code: {code}]\n{string}".format(
code=fault.faultCode,
string=fault.faultString,
)
raise CommandError(message)
assert isinstance(hits, list)
return hits
def transform_hits(hits: List[Dict[str, str]]) -> List["TransformedHit"]:
"""
The list from pypi is really a list of versions. We want a list of
packages with the list of versions stored inline. This converts the
list from pypi into one we can use.
"""
packages: Dict[str, "TransformedHit"] = OrderedDict()
for hit in hits:
name = hit["name"]
summary = hit["summary"]
version = hit["version"]
if name not in packages.keys():
packages[name] = {
"name": name,
"summary": summary,
"versions": [version],
}
else:
packages[name]["versions"].append(version)
# if this is the highest version, replace summary and score
if version == highest_version(packages[name]["versions"]):
packages[name]["summary"] = summary
return list(packages.values())
def print_dist_installation_info(name: str, latest: str) -> None:
env = get_default_environment()
dist = env.get_distribution(name)
if dist is not None:
with indent_log():
if dist.version == latest:
write_output("INSTALLED: %s (latest)", dist.version)
else:
write_output("INSTALLED: %s", dist.version)
if parse_version(latest).pre:
write_output(
"LATEST: %s (pre-release; install"
" with `pip install --pre`)",
latest,
)
else:
write_output("LATEST: %s", latest)
def print_results(
hits: List["TransformedHit"],
name_column_width: Optional[int] = None,
terminal_width: Optional[int] = None,
) -> None:
if not hits:
return
if name_column_width is None:
name_column_width = (
max(
[
len(hit["name"]) + len(highest_version(hit.get("versions", ["-"])))
for hit in hits
]
)
+ 4
)
for hit in hits:
name = hit["name"]
summary = hit["summary"] or ""
latest = highest_version(hit.get("versions", ["-"]))
if terminal_width is not None:
target_width = terminal_width - name_column_width - 5
if target_width > 10:
# wrap and indent summary to fit terminal
summary_lines = textwrap.wrap(summary, target_width)
summary = ("\n" + " " * (name_column_width + 3)).join(summary_lines)
name_latest = f"{name} ({latest})"
line = f"{name_latest:{name_column_width}} - {summary}"
try:
write_output(line)
print_dist_installation_info(name, latest)
except UnicodeEncodeError:
pass
def highest_version(versions: List[str]) -> str:
return max(versions, key=parse_version)
import logging
from optparse import Values
from typing import Generator, Iterable, Iterator, List, NamedTuple, Optional
from pip._vendor.packaging.utils import canonicalize_name
from pip._internal.cli.base_command import Command
from pip._internal.cli.status_codes import ERROR, SUCCESS
from pip._internal.metadata import BaseDistribution, get_default_environment
from pip._internal.utils.misc import write_output
logger = logging.getLogger(__name__)
class ShowCommand(Command):
"""
Show information about one or more installed packages.
The output is in RFC-compliant mail header format.
"""
usage = """
%prog [options] <package> ..."""
ignore_require_venv = True
def add_options(self) -> None:
self.cmd_opts.add_option(
"-f",
"--files",
dest="files",
action="store_true",
default=False,
help="Show the full list of installed files for each package.",
)
self.parser.insert_option_group(0, self.cmd_opts)
def run(self, options: Values, args: List[str]) -> int:
if not args:
logger.warning("ERROR: Please provide a package name or names.")
return ERROR
query = args
results = search_packages_info(query)
if not print_results(
results, list_files=options.files, verbose=options.verbose
):
return ERROR
return SUCCESS
class _PackageInfo(NamedTuple):
name: str
version: str
location: str
editable_project_location: Optional[str]
requires: List[str]
required_by: List[str]
installer: str
metadata_version: str
classifiers: List[str]
summary: str
homepage: str
project_urls: List[str]
author: str
author_email: str
license: str
entry_points: List[str]
files: Optional[List[str]]
def search_packages_info(query: List[str]) -> Generator[_PackageInfo, None, None]:
"""
Gather details from installed distributions. Print distribution name,
version, location, and installed files. Installed files requires a
pip generated 'installed-files.txt' in the distributions '.egg-info'
directory.
"""
env = get_default_environment()
installed = {dist.canonical_name: dist for dist in env.iter_all_distributions()}
query_names = [canonicalize_name(name) for name in query]
missing = sorted(
[name for name, pkg in zip(query, query_names) if pkg not in installed]
)
if missing:
logger.warning("Package(s) not found: %s", ", ".join(missing))
def _get_requiring_packages(current_dist: BaseDistribution) -> Iterator[str]:
return (
dist.metadata["Name"] or "UNKNOWN"
for dist in installed.values()
if current_dist.canonical_name
in {canonicalize_name(d.name) for d in dist.iter_dependencies()}
)
for query_name in query_names:
try:
dist = installed[query_name]
except KeyError:
continue
requires = sorted((req.name for req in dist.iter_dependencies()), key=str.lower)
required_by = sorted(_get_requiring_packages(dist), key=str.lower)
try:
entry_points_text = dist.read_text("entry_points.txt")
entry_points = entry_points_text.splitlines(keepends=False)
except FileNotFoundError:
entry_points = []
files_iter = dist.iter_declared_entries()
if files_iter is None:
files: Optional[List[str]] = None
else:
files = sorted(files_iter)
metadata = dist.metadata
yield _PackageInfo(
name=dist.raw_name,
version=str(dist.version),
location=dist.location or "",
editable_project_location=dist.editable_project_location,
requires=requires,
required_by=required_by,
installer=dist.installer,
metadata_version=dist.metadata_version or "",
classifiers=metadata.get_all("Classifier", []),
summary=metadata.get("Summary", ""),
homepage=metadata.get("Home-page", ""),
project_urls=metadata.get_all("Project-URL", []),
author=metadata.get("Author", ""),
author_email=metadata.get("Author-email", ""),
license=metadata.get("License", ""),
entry_points=entry_points,
files=files,
)
def print_results(
distributions: Iterable[_PackageInfo],
list_files: bool,
verbose: bool,
) -> bool:
"""
Print the information from installed distributions found.
"""
results_printed = False
for i, dist in enumerate(distributions):
results_printed = True
if i > 0:
write_output("---")
write_output("Name: %s", dist.name)
write_output("Version: %s", dist.version)
write_output("Summary: %s", dist.summary)
write_output("Home-page: %s", dist.homepage)
write_output("Author: %s", dist.author)
write_output("Author-email: %s", dist.author_email)
write_output("License: %s", dist.license)
write_output("Location: %s", dist.location)
if dist.editable_project_location is not None:
write_output(
"Editable project location: %s", dist.editable_project_location
)
write_output("Requires: %s", ", ".join(dist.requires))
write_output("Required-by: %s", ", ".join(dist.required_by))
if verbose:
write_output("Metadata-Version: %s", dist.metadata_version)
write_output("Installer: %s", dist.installer)
write_output("Classifiers:")
for classifier in dist.classifiers:
write_output(" %s", classifier)
write_output("Entry-points:")
for entry in dist.entry_points:
write_output(" %s", entry.strip())
write_output("Project-URLs:")
for project_url in dist.project_urls:
write_output(" %s", project_url)
if list_files:
write_output("Files:")
if dist.files is None:
write_output("Cannot locate RECORD or installed-files.txt")
else:
for line in dist.files:
write_output(" %s", line.strip())
return results_printed
import logging
from optparse import Values
from typing import List
from pip._vendor.packaging.utils import canonicalize_name
from pip._internal.cli import cmdoptions
from pip._internal.cli.base_command import Command
from pip._internal.cli.req_command import SessionCommandMixin, warn_if_run_as_root
from pip._internal.cli.status_codes import SUCCESS
from pip._internal.exceptions import InstallationError
from pip._internal.req import parse_requirements
from pip._internal.req.constructors import (
install_req_from_line,
install_req_from_parsed_requirement,
)
from pip._internal.utils.misc import (
check_externally_managed,
protect_pip_from_modification_on_windows,
)
logger = logging.getLogger(__name__)
class UninstallCommand(Command, SessionCommandMixin):
"""
Uninstall packages.
pip is able to uninstall most installed packages. Known exceptions are:
- Pure distutils packages installed with ``python setup.py install``, which
leave behind no metadata to determine what files were installed.
- Script wrappers installed by ``python setup.py develop``.
"""
usage = """
%prog [options] <package> ...
%prog [options] -r <requirements file> ..."""
def add_options(self) -> None:
self.cmd_opts.add_option(
"-r",
"--requirement",
dest="requirements",
action="append",
default=[],
metavar="file",
help=(
"Uninstall all the packages listed in the given requirements "
"file. This option can be used multiple times."
),
)
self.cmd_opts.add_option(
"-y",
"--yes",
dest="yes",
action="store_true",
help="Don't ask for confirmation of uninstall deletions.",
)
self.cmd_opts.add_option(cmdoptions.root_user_action())
self.cmd_opts.add_option(cmdoptions.override_externally_managed())
self.parser.insert_option_group(0, self.cmd_opts)
def run(self, options: Values, args: List[str]) -> int:
session = self.get_default_session(options)
reqs_to_uninstall = {}
for name in args:
req = install_req_from_line(
name,
isolated=options.isolated_mode,
)
if req.name:
reqs_to_uninstall[canonicalize_name(req.name)] = req
else:
logger.warning(
"Invalid requirement: %r ignored -"
" the uninstall command expects named"
" requirements.",
name,
)
for filename in options.requirements:
for parsed_req in parse_requirements(
filename, options=options, session=session
):
req = install_req_from_parsed_requirement(
parsed_req, isolated=options.isolated_mode
)
if req.name:
reqs_to_uninstall[canonicalize_name(req.name)] = req
if not reqs_to_uninstall:
raise InstallationError(
f"You must give at least one requirement to {self.name} (see "
f'"pip help {self.name}")'
)
if not options.override_externally_managed:
check_externally_managed()
protect_pip_from_modification_on_windows(
modifying_pip="pip" in reqs_to_uninstall
)
for req in reqs_to_uninstall.values():
uninstall_pathset = req.uninstall(
auto_confirm=options.yes,
verbose=self.verbosity > 0,
)
if uninstall_pathset:
uninstall_pathset.commit()
if options.root_user_action == "warn":
warn_if_run_as_root()
return SUCCESS
import logging
import os
import shutil
from optparse import Values
from typing import List
from pip._internal.cache import WheelCache
from pip._internal.cli import cmdoptions
from pip._internal.cli.req_command import RequirementCommand, with_cleanup
from pip._internal.cli.status_codes import SUCCESS
from pip._internal.exceptions import CommandError
from pip._internal.operations.build.build_tracker import get_build_tracker
from pip._internal.req.req_install import (
InstallRequirement,
LegacySetupPyOptionsCheckMode,
check_legacy_setup_py_options,
)
from pip._internal.utils.deprecation import deprecated
from pip._internal.utils.misc import ensure_dir, normalize_path
from pip._internal.utils.temp_dir import TempDirectory
from pip._internal.wheel_builder import build, should_build_for_wheel_command
logger = logging.getLogger(__name__)
class WheelCommand(RequirementCommand):
"""
Build Wheel archives for your requirements and dependencies.
Wheel is a built-package format, and offers the advantage of not
recompiling your software during every install. For more details, see the
wheel docs: https://wheel.readthedocs.io/en/latest/
'pip wheel' uses the build system interface as described here:
https://pip.pypa.io/en/stable/reference/build-system/
"""
usage = """
%prog [options] <requirement specifier> ...
%prog [options] -r <requirements file> ...
%prog [options] [-e] <vcs project url> ...
%prog [options] [-e] <local project path> ...
%prog [options] <archive url/path> ..."""
def add_options(self) -> None:
self.cmd_opts.add_option(
"-w",
"--wheel-dir",
dest="wheel_dir",
metavar="dir",
default=os.curdir,
help=(
"Build wheels into <dir>, where the default is the "
"current working directory."
),
)
self.cmd_opts.add_option(cmdoptions.no_binary())
self.cmd_opts.add_option(cmdoptions.only_binary())
self.cmd_opts.add_option(cmdoptions.prefer_binary())
self.cmd_opts.add_option(cmdoptions.no_build_isolation())
self.cmd_opts.add_option(cmdoptions.use_pep517())
self.cmd_opts.add_option(cmdoptions.no_use_pep517())
self.cmd_opts.add_option(cmdoptions.check_build_deps())
self.cmd_opts.add_option(cmdoptions.constraints())
self.cmd_opts.add_option(cmdoptions.editable())
self.cmd_opts.add_option(cmdoptions.requirements())
self.cmd_opts.add_option(cmdoptions.src())
self.cmd_opts.add_option(cmdoptions.ignore_requires_python())
self.cmd_opts.add_option(cmdoptions.no_deps())
self.cmd_opts.add_option(cmdoptions.progress_bar())
self.cmd_opts.add_option(
"--no-verify",
dest="no_verify",
action="store_true",
default=False,
help="Don't verify if built wheel is valid.",
)
self.cmd_opts.add_option(cmdoptions.config_settings())
self.cmd_opts.add_option(cmdoptions.build_options())
self.cmd_opts.add_option(cmdoptions.global_options())
self.cmd_opts.add_option(
"--pre",
action="store_true",
default=False,
help=(
"Include pre-release and development versions. By default, "
"pip only finds stable versions."
),
)
self.cmd_opts.add_option(cmdoptions.require_hashes())
index_opts = cmdoptions.make_option_group(
cmdoptions.index_group,
self.parser,
)
self.parser.insert_option_group(0, index_opts)
self.parser.insert_option_group(0, self.cmd_opts)
@with_cleanup
def run(self, options: Values, args: List[str]) -> int:
session = self.get_default_session(options)
finder = self._build_package_finder(options, session)
wheel_cache = WheelCache(options.cache_dir, options.format_control)
options.wheel_dir = normalize_path(options.wheel_dir)
ensure_dir(options.wheel_dir)
build_tracker = self.enter_context(get_build_tracker())
directory = TempDirectory(
delete=not options.no_clean,
kind="wheel",
globally_managed=True,
)
reqs = self.get_requirements(args, options, finder, session)
check_legacy_setup_py_options(
options, reqs, LegacySetupPyOptionsCheckMode.WHEEL
)
if "no-binary-enable-wheel-cache" in options.features_enabled:
# TODO: remove format_control from WheelCache when the deprecation cycle
# is over
wheel_cache = WheelCache(options.cache_dir)
else:
if options.format_control.no_binary:
deprecated(
reason=(
"--no-binary currently disables reading from "
"the cache of locally built wheels. In the future "
"--no-binary will not influence the wheel cache."
),
replacement="to use the --no-cache-dir option",
feature_flag="no-binary-enable-wheel-cache",
issue=11453,
gone_in="23.1",
)
wheel_cache = WheelCache(options.cache_dir, options.format_control)
preparer = self.make_requirement_preparer(
temp_build_dir=directory,
options=options,
build_tracker=build_tracker,
session=session,
finder=finder,
download_dir=options.wheel_dir,
use_user_site=False,
verbosity=self.verbosity,
)
resolver = self.make_resolver(
preparer=preparer,
finder=finder,
options=options,
wheel_cache=wheel_cache,
ignore_requires_python=options.ignore_requires_python,
use_pep517=options.use_pep517,
)
self.trace_basic_info(finder)
requirement_set = resolver.resolve(reqs, check_supported_wheels=True)
reqs_to_build: List[InstallRequirement] = []
for req in requirement_set.requirements.values():
if req.is_wheel:
preparer.save_linked_requirement(req)
elif should_build_for_wheel_command(req):
reqs_to_build.append(req)
# build wheels
build_successes, build_failures = build(
reqs_to_build,
wheel_cache=wheel_cache,
verify=(not options.no_verify),
build_options=options.build_options or [],
global_options=options.global_options or [],
)
for req in build_successes:
assert req.link and req.link.is_wheel
assert req.local_file_path
# copy from cache to target directory
try:
shutil.copy(req.local_file_path, options.wheel_dir)
except OSError as e:
logger.warning(
"Building wheel for %s failed: %s",
req.name,
e,
)
build_failures.append(req)
if len(build_failures) != 0:
raise CommandError("Failed to build one or more wheels")
return SUCCESS
"""Configuration management setup
Some terminology:
- name
As written in config files.
- value
Value associated with a name
- key
Name combined with it's section (section.name)
- variant
A single word describing where the configuration key-value pair came from
"""
import configparser
import locale
import os
import sys
from typing import Any, Dict, Iterable, List, NewType, Optional, Tuple
from pip._internal.exceptions import (
ConfigurationError,
ConfigurationFileCouldNotBeLoaded,
)
from pip._internal.utils import appdirs
from pip._internal.utils.compat import WINDOWS
from pip._internal.utils.logging import getLogger
from pip._internal.utils.misc import ensure_dir, enum
RawConfigParser = configparser.RawConfigParser # Shorthand
Kind = NewType("Kind", str)
CONFIG_BASENAME = "pip.ini" if WINDOWS else "pip.conf"
ENV_NAMES_IGNORED = "version", "help"
# The kinds of configurations there are.
kinds = enum(
USER="user", # User Specific
GLOBAL="global", # System Wide
SITE="site", # [Virtual] Environment Specific
ENV="env", # from PIP_CONFIG_FILE
ENV_VAR="env-var", # from Environment Variables
)
OVERRIDE_ORDER = kinds.GLOBAL, kinds.USER, kinds.SITE, kinds.ENV, kinds.ENV_VAR
VALID_LOAD_ONLY = kinds.USER, kinds.GLOBAL, kinds.SITE
logger = getLogger(__name__)
# NOTE: Maybe use the optionx attribute to normalize keynames.
def _normalize_name(name: str) -> str:
"""Make a name consistent regardless of source (environment or file)"""
name = name.lower().replace("_", "-")
if name.startswith("--"):
name = name[2:] # only prefer long opts
return name
def _disassemble_key(name: str) -> List[str]:
if "." not in name:
error_message = (
"Key does not contain dot separated section and key. "
"Perhaps you wanted to use 'global.{}' instead?"
).format(name)
raise ConfigurationError(error_message)
return name.split(".", 1)
def get_configuration_files() -> Dict[Kind, List[str]]:
global_config_files = [
os.path.join(path, CONFIG_BASENAME) for path in appdirs.site_config_dirs("pip")
]
site_config_file = os.path.join(sys.prefix, CONFIG_BASENAME)
legacy_config_file = os.path.join(
os.path.expanduser("~"),
"pip" if WINDOWS else ".pip",
CONFIG_BASENAME,
)
new_config_file = os.path.join(appdirs.user_config_dir("pip"), CONFIG_BASENAME)
return {
kinds.GLOBAL: global_config_files,
kinds.SITE: [site_config_file],
kinds.USER: [legacy_config_file, new_config_file],
}
class Configuration:
"""Handles management of configuration.
Provides an interface to accessing and managing configuration files.
This class converts provides an API that takes "section.key-name" style
keys and stores the value associated with it as "key-name" under the
section "section".
This allows for a clean interface wherein the both the section and the
key-name are preserved in an easy to manage form in the configuration files
and the data stored is also nice.
"""
def __init__(self, isolated: bool, load_only: Optional[Kind] = None) -> None:
super().__init__()
if load_only is not None and load_only not in VALID_LOAD_ONLY:
raise ConfigurationError(
"Got invalid value for load_only - should be one of {}".format(
", ".join(map(repr, VALID_LOAD_ONLY))
)
)
self.isolated = isolated
self.load_only = load_only
# Because we keep track of where we got the data from
self._parsers: Dict[Kind, List[Tuple[str, RawConfigParser]]] = {
variant: [] for variant in OVERRIDE_ORDER
}
self._config: Dict[Kind, Dict[str, Any]] = {
variant: {} for variant in OVERRIDE_ORDER
}
self._modified_parsers: List[Tuple[str, RawConfigParser]] = []
def load(self) -> None:
"""Loads configuration from configuration files and environment"""
self._load_config_files()
if not self.isolated:
self._load_environment_vars()
def get_file_to_edit(self) -> Optional[str]:
"""Returns the file with highest priority in configuration"""
assert self.load_only is not None, "Need to be specified a file to be editing"
try:
return self._get_parser_to_modify()[0]
except IndexError:
return None
def items(self) -> Iterable[Tuple[str, Any]]:
"""Returns key-value pairs like dict.items() representing the loaded
configuration
"""
return self._dictionary.items()
def get_value(self, key: str) -> Any:
"""Get a value from the configuration."""
orig_key = key
key = _normalize_name(key)
try:
return self._dictionary[key]
except KeyError:
# disassembling triggers a more useful error message than simply
# "No such key" in the case that the key isn't in the form command.option
_disassemble_key(key)
raise ConfigurationError(f"No such key - {orig_key}")
def set_value(self, key: str, value: Any) -> None:
"""Modify a value in the configuration."""
key = _normalize_name(key)
self._ensure_have_load_only()
assert self.load_only
fname, parser = self._get_parser_to_modify()
if parser is not None:
section, name = _disassemble_key(key)
# Modify the parser and the configuration
if not parser.has_section(section):
parser.add_section(section)
parser.set(section, name, value)
self._config[self.load_only][key] = value
self._mark_as_modified(fname, parser)
def unset_value(self, key: str) -> None:
"""Unset a value in the configuration."""
orig_key = key
key = _normalize_name(key)
self._ensure_have_load_only()
assert self.load_only
if key not in self._config[self.load_only]:
raise ConfigurationError(f"No such key - {orig_key}")
fname, parser = self._get_parser_to_modify()
if parser is not None:
section, name = _disassemble_key(key)
if not (
parser.has_section(section) and parser.remove_option(section, name)
):
# The option was not removed.
raise ConfigurationError(
"Fatal Internal error [id=1]. Please report as a bug."
)
# The section may be empty after the option was removed.
if not parser.items(section):
parser.remove_section(section)
self._mark_as_modified(fname, parser)
del self._config[self.load_only][key]
def save(self) -> None:
"""Save the current in-memory state."""
self._ensure_have_load_only()
for fname, parser in self._modified_parsers:
logger.info("Writing to %s", fname)
# Ensure directory exists.
ensure_dir(os.path.dirname(fname))
with open(fname, "w") as f:
parser.write(f)
#
# Private routines
#
def _ensure_have_load_only(self) -> None:
if self.load_only is None:
raise ConfigurationError("Needed a specific file to be modifying.")
logger.debug("Will be working with %s variant only", self.load_only)
@property
def _dictionary(self) -> Dict[str, Any]:
"""A dictionary representing the loaded configuration."""
# NOTE: Dictionaries are not populated if not loaded. So, conditionals
# are not needed here.
retval = {}
for variant in OVERRIDE_ORDER:
retval.update(self._config[variant])
return retval
def _load_config_files(self) -> None:
"""Loads configuration from configuration files"""
config_files = dict(self.iter_config_files())
if config_files[kinds.ENV][0:1] == [os.devnull]:
logger.debug(
"Skipping loading configuration files due to "
"environment's PIP_CONFIG_FILE being os.devnull"
)
return
for variant, files in config_files.items():
for fname in files:
# If there's specific variant set in `load_only`, load only
# that variant, not the others.
if self.load_only is not None and variant != self.load_only:
logger.debug("Skipping file '%s' (variant: %s)", fname, variant)
continue
parser = self._load_file(variant, fname)
# Keeping track of the parsers used
self._parsers[variant].append((fname, parser))
def _load_file(self, variant: Kind, fname: str) -> RawConfigParser:
logger.verbose("For variant '%s', will try loading '%s'", variant, fname)
parser = self._construct_parser(fname)
for section in parser.sections():
items = parser.items(section)
self._config[variant].update(self._normalized_keys(section, items))
return parser
def _construct_parser(self, fname: str) -> RawConfigParser:
parser = configparser.RawConfigParser()
# If there is no such file, don't bother reading it but create the
# parser anyway, to hold the data.
# Doing this is useful when modifying and saving files, where we don't
# need to construct a parser.
if os.path.exists(fname):
locale_encoding = locale.getpreferredencoding(False)
try:
parser.read(fname, encoding=locale_encoding)
except UnicodeDecodeError:
# See https://github.com/pypa/pip/issues/4963
raise ConfigurationFileCouldNotBeLoaded(
reason=f"contains invalid {locale_encoding} characters",
fname=fname,
)
except configparser.Error as error:
# See https://github.com/pypa/pip/issues/4893
raise ConfigurationFileCouldNotBeLoaded(error=error)
return parser
def _load_environment_vars(self) -> None:
"""Loads configuration from environment variables"""
self._config[kinds.ENV_VAR].update(
self._normalized_keys(":env:", self.get_environ_vars())
)
def _normalized_keys(
self, section: str, items: Iterable[Tuple[str, Any]]
) -> Dict[str, Any]:
"""Normalizes items to construct a dictionary with normalized keys.
This routine is where the names become keys and are made the same
regardless of source - configuration files or environment.
"""
normalized = {}
for name, val in items:
key = section + "." + _normalize_name(name)
normalized[key] = val
return normalized
def get_environ_vars(self) -> Iterable[Tuple[str, str]]:
"""Returns a generator with all environmental vars with prefix PIP_"""
for key, val in os.environ.items():
if key.startswith("PIP_"):
name = key[4:].lower()
if name not in ENV_NAMES_IGNORED:
yield name, val
# XXX: This is patched in the tests.
def iter_config_files(self) -> Iterable[Tuple[Kind, List[str]]]:
"""Yields variant and configuration files associated with it.
This should be treated like items of a dictionary.
"""
# SMELL: Move the conditions out of this function
# environment variables have the lowest priority
config_file = os.environ.get("PIP_CONFIG_FILE", None)
if config_file is not None:
yield kinds.ENV, [config_file]
else:
yield kinds.ENV, []
config_files = get_configuration_files()
# at the base we have any global configuration
yield kinds.GLOBAL, config_files[kinds.GLOBAL]
# per-user configuration next
should_load_user_config = not self.isolated and not (
config_file and os.path.exists(config_file)
)
if should_load_user_config:
# The legacy config file is overridden by the new config file
yield kinds.USER, config_files[kinds.USER]
# finally virtualenv configuration first trumping others
yield kinds.SITE, config_files[kinds.SITE]
def get_values_in_config(self, variant: Kind) -> Dict[str, Any]:
"""Get values present in a config file"""
return self._config[variant]
def _get_parser_to_modify(self) -> Tuple[str, RawConfigParser]:
# Determine which parser to modify
assert self.load_only
parsers = self._parsers[self.load_only]
if not parsers:
# This should not happen if everything works correctly.
raise ConfigurationError(
"Fatal Internal error [id=2]. Please report as a bug."
)
# Use the highest priority parser.
return parsers[-1]
# XXX: This is patched in the tests.
def _mark_as_modified(self, fname: str, parser: RawConfigParser) -> None:
file_parser_tuple = (fname, parser)
if file_parser_tuple not in self._modified_parsers:
self._modified_parsers.append(file_parser_tuple)
def __repr__(self) -> str:
return f"{self.__class__.__name__}({self._dictionary!r})"
from pip._internal.distributions.base import AbstractDistribution
from pip._internal.distributions.sdist import SourceDistribution
from pip._internal.distributions.wheel import WheelDistribution
from pip._internal.req.req_install import InstallRequirement
def make_distribution_for_install_requirement(
install_req: InstallRequirement,
) -> AbstractDistribution:
"""Returns a Distribution for the given InstallRequirement"""
# Editable requirements will always be source distributions. They use the
# legacy logic until we create a modern standard for them.
if install_req.editable:
return SourceDistribution(install_req)
# If it's a wheel, it's a WheelDistribution
if install_req.is_wheel:
return WheelDistribution(install_req)
# Otherwise, a SourceDistribution
return SourceDistribution(install_req)
import abc
from pip._internal.index.package_finder import PackageFinder
from pip._internal.metadata.base import BaseDistribution
from pip._internal.req import InstallRequirement
class AbstractDistribution(metaclass=abc.ABCMeta):
"""A base class for handling installable artifacts.
The requirements for anything installable are as follows:
- we must be able to determine the requirement name
(or we can't correctly handle the non-upgrade case).
- for packages with setup requirements, we must also be able
to determine their requirements without installing additional
packages (for the same reason as run-time dependencies)
- we must be able to create a Distribution object exposing the
above metadata.
"""
def __init__(self, req: InstallRequirement) -> None:
super().__init__()
self.req = req
@abc.abstractmethod
def get_metadata_distribution(self) -> BaseDistribution:
raise NotImplementedError()
@abc.abstractmethod
def prepare_distribution_metadata(
self,
finder: PackageFinder,
build_isolation: bool,
check_build_deps: bool,
) -> None:
raise NotImplementedError()
from pip._internal.distributions.base import AbstractDistribution
from pip._internal.index.package_finder import PackageFinder
from pip._internal.metadata import BaseDistribution
class InstalledDistribution(AbstractDistribution):
"""Represents an installed package.
This does not need any preparation as the required information has already
been computed.
"""
def get_metadata_distribution(self) -> BaseDistribution:
assert self.req.satisfied_by is not None, "not actually installed"
return self.req.satisfied_by
def prepare_distribution_metadata(
self,
finder: PackageFinder,
build_isolation: bool,
check_build_deps: bool,
) -> None:
pass
import logging
from typing import Iterable, Set, Tuple
from pip._internal.build_env import BuildEnvironment
from pip._internal.distributions.base import AbstractDistribution
from pip._internal.exceptions import InstallationError
from pip._internal.index.package_finder import PackageFinder
from pip._internal.metadata import BaseDistribution
from pip._internal.utils.subprocess import runner_with_spinner_message
logger = logging.getLogger(__name__)
class SourceDistribution(AbstractDistribution):
"""Represents a source distribution.
The preparation step for these needs metadata for the packages to be
generated, either using PEP 517 or using the legacy `setup.py egg_info`.
"""
def get_metadata_distribution(self) -> BaseDistribution:
return self.req.get_dist()
def prepare_distribution_metadata(
self,
finder: PackageFinder,
build_isolation: bool,
check_build_deps: bool,
) -> None:
# Load pyproject.toml, to determine whether PEP 517 is to be used
self.req.load_pyproject_toml()
# Set up the build isolation, if this requirement should be isolated
should_isolate = self.req.use_pep517 and build_isolation
if should_isolate:
# Setup an isolated environment and install the build backend static
# requirements in it.
self._prepare_build_backend(finder)
# Check that if the requirement is editable, it either supports PEP 660 or
# has a setup.py or a setup.cfg. This cannot be done earlier because we need
# to setup the build backend to verify it supports build_editable, nor can
# it be done later, because we want to avoid installing build requirements
# needlessly. Doing it here also works around setuptools generating
# UNKNOWN.egg-info when running get_requires_for_build_wheel on a directory
# without setup.py nor setup.cfg.
self.req.isolated_editable_sanity_check()
# Install the dynamic build requirements.
self._install_build_reqs(finder)
# Check if the current environment provides build dependencies
should_check_deps = self.req.use_pep517 and check_build_deps
if should_check_deps:
pyproject_requires = self.req.pyproject_requires
assert pyproject_requires is not None
conflicting, missing = self.req.build_env.check_requirements(
pyproject_requires
)
if conflicting:
self._raise_conflicts("the backend dependencies", conflicting)
if missing:
self._raise_missing_reqs(missing)
self.req.prepare_metadata()
def _prepare_build_backend(self, finder: PackageFinder) -> None:
# Isolate in a BuildEnvironment and install the build-time
# requirements.
pyproject_requires = self.req.pyproject_requires
assert pyproject_requires is not None
self.req.build_env = BuildEnvironment()
self.req.build_env.install_requirements(
finder, pyproject_requires, "overlay", kind="build dependencies"
)
conflicting, missing = self.req.build_env.check_requirements(
self.req.requirements_to_check
)
if conflicting:
self._raise_conflicts("PEP 517/518 supported requirements", conflicting)
if missing:
logger.warning(
"Missing build requirements in pyproject.toml for %s.",
self.req,
)
logger.warning(
"The project does not specify a build backend, and "
"pip cannot fall back to setuptools without %s.",
" and ".join(map(repr, sorted(missing))),
)
def _get_build_requires_wheel(self) -> Iterable[str]:
with self.req.build_env:
runner = runner_with_spinner_message("Getting requirements to build wheel")
backend = self.req.pep517_backend
assert backend is not None
with backend.subprocess_runner(runner):
return backend.get_requires_for_build_wheel()
def _get_build_requires_editable(self) -> Iterable[str]:
with self.req.build_env:
runner = runner_with_spinner_message(
"Getting requirements to build editable"
)
backend = self.req.pep517_backend
assert backend is not None
with backend.subprocess_runner(runner):
return backend.get_requires_for_build_editable()
def _install_build_reqs(self, finder: PackageFinder) -> None:
# Install any extra build dependencies that the backend requests.
# This must be done in a second pass, as the pyproject.toml
# dependencies must be installed before we can call the backend.
if (
self.req.editable
and self.req.permit_editable_wheels
and self.req.supports_pyproject_editable()
):
build_reqs = self._get_build_requires_editable()
else:
build_reqs = self._get_build_requires_wheel()
conflicting, missing = self.req.build_env.check_requirements(build_reqs)
if conflicting:
self._raise_conflicts("the backend dependencies", conflicting)
self.req.build_env.install_requirements(
finder, missing, "normal", kind="backend dependencies"
)
def _raise_conflicts(
self, conflicting_with: str, conflicting_reqs: Set[Tuple[str, str]]
) -> None:
format_string = (
"Some build dependencies for {requirement} "
"conflict with {conflicting_with}: {description}."
)
error_message = format_string.format(
requirement=self.req,
conflicting_with=conflicting_with,
description=", ".join(
f"{installed} is incompatible with {wanted}"
for installed, wanted in sorted(conflicting_reqs)
),
)
raise InstallationError(error_message)
def _raise_missing_reqs(self, missing: Set[str]) -> None:
format_string = (
"Some build dependencies for {requirement} are missing: {missing}."
)
error_message = format_string.format(
requirement=self.req, missing=", ".join(map(repr, sorted(missing)))
)
raise InstallationError(error_message)
from pip._vendor.packaging.utils import canonicalize_name
from pip._internal.distributions.base import AbstractDistribution
from pip._internal.index.package_finder import PackageFinder
from pip._internal.metadata import (
BaseDistribution,
FilesystemWheel,
get_wheel_distribution,
)
class WheelDistribution(AbstractDistribution):
"""Represents a wheel distribution.
This does not need any preparation as wheels can be directly unpacked.
"""
def get_metadata_distribution(self) -> BaseDistribution:
"""Loads the metadata from the wheel file into memory and returns a
Distribution that uses it, not relying on the wheel file or
requirement.
"""
assert self.req.local_file_path, "Set as part of preparation during download"
assert self.req.name, "Wheels are never unnamed"
wheel = FilesystemWheel(self.req.local_file_path)
return get_wheel_distribution(wheel, canonicalize_name(self.req.name))
def prepare_distribution_metadata(
self,
finder: PackageFinder,
build_isolation: bool,
check_build_deps: bool,
) -> None:
pass
"""Exceptions used throughout package.
This module MUST NOT try to import from anything within `pip._internal` to
operate. This is expected to be importable from any/all files within the
subpackage and, thus, should not depend on them.
"""
import configparser
import contextlib
import locale
import logging
import pathlib
import re
import sys
from itertools import chain, groupby, repeat
from typing import TYPE_CHECKING, Dict, Iterator, List, Optional, Union
from pip._vendor.requests.models import Request, Response
from pip._vendor.rich.console import Console, ConsoleOptions, RenderResult
from pip._vendor.rich.markup import escape
from pip._vendor.rich.text import Text
if TYPE_CHECKING:
from hashlib import _Hash
from typing import Literal
from pip._internal.metadata import BaseDistribution
from pip._internal.req.req_install import InstallRequirement
logger = logging.getLogger(__name__)
#
# Scaffolding
#
def _is_kebab_case(s: str) -> bool:
return re.match(r"^[a-z]+(-[a-z]+)*$", s) is not None
def _prefix_with_indent(
s: Union[Text, str],
console: Console,
*,
prefix: str,
indent: str,
) -> Text:
if isinstance(s, Text):
text = s
else:
text = console.render_str(s)
return console.render_str(prefix, overflow="ignore") + console.render_str(
f"\n{indent}", overflow="ignore"
).join(text.split(allow_blank=True))
class PipError(Exception):
"""The base pip error."""
class DiagnosticPipError(PipError):
"""An error, that presents diagnostic information to the user.
This contains a bunch of logic, to enable pretty presentation of our error
messages. Each error gets a unique reference. Each error can also include
additional context, a hint and/or a note -- which are presented with the
main error message in a consistent style.
This is adapted from the error output styling in `sphinx-theme-builder`.
"""
reference: str
def __init__(
self,
*,
kind: 'Literal["error", "warning"]' = "error",
reference: Optional[str] = None,
message: Union[str, Text],
context: Optional[Union[str, Text]],
hint_stmt: Optional[Union[str, Text]],
note_stmt: Optional[Union[str, Text]] = None,
link: Optional[str] = None,
) -> None:
# Ensure a proper reference is provided.
if reference is None:
assert hasattr(self, "reference"), "error reference not provided!"
reference = self.reference
assert _is_kebab_case(reference), "error reference must be kebab-case!"
self.kind = kind
self.reference = reference
self.message = message
self.context = context
self.note_stmt = note_stmt
self.hint_stmt = hint_stmt
self.link = link
super().__init__(f"<{self.__class__.__name__}: {self.reference}>")
def __repr__(self) -> str:
return (
f"<{self.__class__.__name__}("
f"reference={self.reference!r}, "
f"message={self.message!r}, "
f"context={self.context!r}, "
f"note_stmt={self.note_stmt!r}, "
f"hint_stmt={self.hint_stmt!r}"
")>"
)
def __rich_console__(
self,
console: Console,
options: ConsoleOptions,
) -> RenderResult:
colour = "red" if self.kind == "error" else "yellow"
yield f"[{colour} bold]{self.kind}[/]: [bold]{self.reference}[/]"
yield ""
if not options.ascii_only:
# Present the main message, with relevant context indented.
if self.context is not None:
yield _prefix_with_indent(
self.message,
console,
prefix=f"[{colour}]×[/] ",
indent=f"[{colour}]│[/] ",
)
yield _prefix_with_indent(
self.context,
console,
prefix=f"[{colour}]╰─>[/] ",
indent=f"[{colour}] [/] ",
)
else:
yield _prefix_with_indent(
self.message,
console,
prefix="[red]×[/] ",
indent=" ",
)
else:
yield self.message
if self.context is not None:
yield ""
yield self.context
if self.note_stmt is not None or self.hint_stmt is not None:
yield ""
if self.note_stmt is not None:
yield _prefix_with_indent(
self.note_stmt,
console,
prefix="[magenta bold]note[/]: ",
indent=" ",
)
if self.hint_stmt is not None:
yield _prefix_with_indent(
self.hint_stmt,
console,
prefix="[cyan bold]hint[/]: ",
indent=" ",
)
if self.link is not None:
yield ""
yield f"Link: {self.link}"
#
# Actual Errors
#
class ConfigurationError(PipError):
"""General exception in configuration"""
class InstallationError(PipError):
"""General exception during installation"""
class UninstallationError(PipError):
"""General exception during uninstallation"""
class MissingPyProjectBuildRequires(DiagnosticPipError):
"""Raised when pyproject.toml has `build-system`, but no `build-system.requires`."""
reference = "missing-pyproject-build-system-requires"
def __init__(self, *, package: str) -> None:
super().__init__(
message=f"Can not process {escape(package)}",
context=Text(
"This package has an invalid pyproject.toml file.\n"
"The [build-system] table is missing the mandatory `requires` key."
),
note_stmt="This is an issue with the package mentioned above, not pip.",
hint_stmt=Text("See PEP 518 for the detailed specification."),
)
class InvalidPyProjectBuildRequires(DiagnosticPipError):
"""Raised when pyproject.toml an invalid `build-system.requires`."""
reference = "invalid-pyproject-build-system-requires"
def __init__(self, *, package: str, reason: str) -> None:
super().__init__(
message=f"Can not process {escape(package)}",
context=Text(
"This package has an invalid `build-system.requires` key in "
f"pyproject.toml.\n{reason}"
),
note_stmt="This is an issue with the package mentioned above, not pip.",
hint_stmt=Text("See PEP 518 for the detailed specification."),
)
class NoneMetadataError(PipError):
"""Raised when accessing a Distribution's "METADATA" or "PKG-INFO".
This signifies an inconsistency, when the Distribution claims to have
the metadata file (if not, raise ``FileNotFoundError`` instead), but is
not actually able to produce its content. This may be due to permission
errors.
"""
def __init__(
self,
dist: "BaseDistribution",
metadata_name: str,
) -> None:
"""
:param dist: A Distribution object.
:param metadata_name: The name of the metadata being accessed
(can be "METADATA" or "PKG-INFO").
"""
self.dist = dist
self.metadata_name = metadata_name
def __str__(self) -> str:
# Use `dist` in the error message because its stringification
# includes more information, like the version and location.
return "None {} metadata found for distribution: {}".format(
self.metadata_name,
self.dist,
)
class UserInstallationInvalid(InstallationError):
"""A --user install is requested on an environment without user site."""
def __str__(self) -> str:
return "User base directory is not specified"
class InvalidSchemeCombination(InstallationError):
def __str__(self) -> str:
before = ", ".join(str(a) for a in self.args[:-1])
return f"Cannot set {before} and {self.args[-1]} together"
class DistributionNotFound(InstallationError):
"""Raised when a distribution cannot be found to satisfy a requirement"""
class RequirementsFileParseError(InstallationError):
"""Raised when a general error occurs parsing a requirements file line."""
class BestVersionAlreadyInstalled(PipError):
"""Raised when the most up-to-date version of a package is already
installed."""
class BadCommand(PipError):
"""Raised when virtualenv or a command is not found"""
class CommandError(PipError):
"""Raised when there is an error in command-line arguments"""
class PreviousBuildDirError(PipError):
"""Raised when there's a previous conflicting build directory"""
class NetworkConnectionError(PipError):
"""HTTP connection error"""
def __init__(
self,
error_msg: str,
response: Optional[Response] = None,
request: Optional[Request] = None,
) -> None:
"""
Initialize NetworkConnectionError with `request` and `response`
objects.
"""
self.response = response
self.request = request
self.error_msg = error_msg
if (
self.response is not None
and not self.request
and hasattr(response, "request")
):
self.request = self.response.request
super().__init__(error_msg, response, request)
def __str__(self) -> str:
return str(self.error_msg)
class InvalidWheelFilename(InstallationError):
"""Invalid wheel filename."""
class UnsupportedWheel(InstallationError):
"""Unsupported wheel."""
class InvalidWheel(InstallationError):
"""Invalid (e.g. corrupt) wheel."""
def __init__(self, location: str, name: str):
self.location = location
self.name = name
def __str__(self) -> str:
return f"Wheel '{self.name}' located at {self.location} is invalid."
class MetadataInconsistent(InstallationError):
"""Built metadata contains inconsistent information.
This is raised when the metadata contains values (e.g. name and version)
that do not match the information previously obtained from sdist filename,
user-supplied ``#egg=`` value, or an install requirement name.
"""
def __init__(
self, ireq: "InstallRequirement", field: str, f_val: str, m_val: str
) -> None:
self.ireq = ireq
self.field = field
self.f_val = f_val
self.m_val = m_val
def __str__(self) -> str:
return (
f"Requested {self.ireq} has inconsistent {self.field}: "
f"expected {self.f_val!r}, but metadata has {self.m_val!r}"
)
class LegacyInstallFailure(DiagnosticPipError):
"""Error occurred while executing `setup.py install`"""
reference = "legacy-install-failure"
def __init__(self, package_details: str) -> None:
super().__init__(
message="Encountered error while trying to install package.",
context=package_details,
hint_stmt="See above for output from the failure.",
note_stmt="This is an issue with the package mentioned above, not pip.",
)
class InstallationSubprocessError(DiagnosticPipError, InstallationError):
"""A subprocess call failed."""
reference = "subprocess-exited-with-error"
def __init__(
self,
*,
command_description: str,
exit_code: int,
output_lines: Optional[List[str]],
) -> None:
if output_lines is None:
output_prompt = Text("See above for output.")
else:
output_prompt = (
Text.from_markup(f"[red][{len(output_lines)} lines of output][/]\n")
+ Text("".join(output_lines))
+ Text.from_markup(R"[red]\[end of output][/]")
)
super().__init__(
message=(
f"[green]{escape(command_description)}[/] did not run successfully.\n"
f"exit code: {exit_code}"
),
context=output_prompt,
hint_stmt=None,
note_stmt=(
"This error originates from a subprocess, and is likely not a "
"problem with pip."
),
)
self.command_description = command_description
self.exit_code = exit_code
def __str__(self) -> str:
return f"{self.command_description} exited with {self.exit_code}"
class MetadataGenerationFailed(InstallationSubprocessError, InstallationError):
reference = "metadata-generation-failed"
def __init__(
self,
*,
package_details: str,
) -> None:
super(InstallationSubprocessError, self).__init__(
message="Encountered error while generating package metadata.",
context=escape(package_details),
hint_stmt="See above for details.",
note_stmt="This is an issue with the package mentioned above, not pip.",
)
def __str__(self) -> str:
return "metadata generation failed"
class HashErrors(InstallationError):
"""Multiple HashError instances rolled into one for reporting"""
def __init__(self) -> None:
self.errors: List["HashError"] = []
def append(self, error: "HashError") -> None:
self.errors.append(error)
def __str__(self) -> str:
lines = []
self.errors.sort(key=lambda e: e.order)
for cls, errors_of_cls in groupby(self.errors, lambda e: e.__class__):
lines.append(cls.head)
lines.extend(e.body() for e in errors_of_cls)
if lines:
return "\n".join(lines)
return ""
def __bool__(self) -> bool:
return bool(self.errors)
class HashError(InstallationError):
"""
A failure to verify a package against known-good hashes
:cvar order: An int sorting hash exception classes by difficulty of
recovery (lower being harder), so the user doesn't bother fretting
about unpinned packages when he has deeper issues, like VCS
dependencies, to deal with. Also keeps error reports in a
deterministic order.
:cvar head: A section heading for display above potentially many
exceptions of this kind
:ivar req: The InstallRequirement that triggered this error. This is
pasted on after the exception is instantiated, because it's not
typically available earlier.
"""
req: Optional["InstallRequirement"] = None
head = ""
order: int = -1
def body(self) -> str:
"""Return a summary of me for display under the heading.
This default implementation simply prints a description of the
triggering requirement.
:param req: The InstallRequirement that provoked this error, with
its link already populated by the resolver's _populate_link().
"""
return f" {self._requirement_name()}"
def __str__(self) -> str:
return f"{self.head}\n{self.body()}"
def _requirement_name(self) -> str:
"""Return a description of the requirement that triggered me.
This default implementation returns long description of the req, with
line numbers
"""
return str(self.req) if self.req else "unknown package"
class VcsHashUnsupported(HashError):
"""A hash was provided for a version-control-system-based requirement, but
we don't have a method for hashing those."""
order = 0
head = (
"Can't verify hashes for these requirements because we don't "
"have a way to hash version control repositories:"
)
class DirectoryUrlHashUnsupported(HashError):
"""A hash was provided for a version-control-system-based requirement, but
we don't have a method for hashing those."""
order = 1
head = (
"Can't verify hashes for these file:// requirements because they "
"point to directories:"
)
class HashMissing(HashError):
"""A hash was needed for a requirement but is absent."""
order = 2
head = (
"Hashes are required in --require-hashes mode, but they are "
"missing from some requirements. Here is a list of those "
"requirements along with the hashes their downloaded archives "
"actually had. Add lines like these to your requirements files to "
"prevent tampering. (If you did not enable --require-hashes "
"manually, note that it turns on automatically when any package "
"has a hash.)"
)
def __init__(self, gotten_hash: str) -> None:
"""
:param gotten_hash: The hash of the (possibly malicious) archive we
just downloaded
"""
self.gotten_hash = gotten_hash
def body(self) -> str:
# Dodge circular import.
from pip._internal.utils.hashes import FAVORITE_HASH
package = None
if self.req:
# In the case of URL-based requirements, display the original URL
# seen in the requirements file rather than the package name,
# so the output can be directly copied into the requirements file.
package = (
self.req.original_link
if self.req.original_link
# In case someone feeds something downright stupid
# to InstallRequirement's constructor.
else getattr(self.req, "req", None)
)
return " {} --hash={}:{}".format(
package or "unknown package", FAVORITE_HASH, self.gotten_hash
)
class HashUnpinned(HashError):
"""A requirement had a hash specified but was not pinned to a specific
version."""
order = 3
head = (
"In --require-hashes mode, all requirements must have their "
"versions pinned with ==. These do not:"
)
class HashMismatch(HashError):
"""
Distribution file hash values don't match.
:ivar package_name: The name of the package that triggered the hash
mismatch. Feel free to write to this after the exception is raise to
improve its error message.
"""
order = 4
head = (
"THESE PACKAGES DO NOT MATCH THE HASHES FROM THE REQUIREMENTS "
"FILE. If you have updated the package versions, please update "
"the hashes. Otherwise, examine the package contents carefully; "
"someone may have tampered with them."
)
def __init__(self, allowed: Dict[str, List[str]], gots: Dict[str, "_Hash"]) -> None:
"""
:param allowed: A dict of algorithm names pointing to lists of allowed
hex digests
:param gots: A dict of algorithm names pointing to hashes we
actually got from the files under suspicion
"""
self.allowed = allowed
self.gots = gots
def body(self) -> str:
return " {}:\n{}".format(self._requirement_name(), self._hash_comparison())
def _hash_comparison(self) -> str:
"""
Return a comparison of actual and expected hash values.
Example::
Expected sha256 abcdeabcdeabcdeabcdeabcdeabcdeabcdeabcdeabcde
or 123451234512345123451234512345123451234512345
Got bcdefbcdefbcdefbcdefbcdefbcdefbcdefbcdefbcdef
"""
def hash_then_or(hash_name: str) -> "chain[str]":
# For now, all the decent hashes have 6-char names, so we can get
# away with hard-coding space literals.
return chain([hash_name], repeat(" or"))
lines: List[str] = []
for hash_name, expecteds in self.allowed.items():
prefix = hash_then_or(hash_name)
lines.extend(
(" Expected {} {}".format(next(prefix), e)) for e in expecteds
)
lines.append(
" Got {}\n".format(self.gots[hash_name].hexdigest())
)
return "\n".join(lines)
class UnsupportedPythonVersion(InstallationError):
"""Unsupported python version according to Requires-Python package
metadata."""
class ConfigurationFileCouldNotBeLoaded(ConfigurationError):
"""When there are errors while loading a configuration file"""
def __init__(
self,
reason: str = "could not be loaded",
fname: Optional[str] = None,
error: Optional[configparser.Error] = None,
) -> None:
super().__init__(error)
self.reason = reason
self.fname = fname
self.error = error
def __str__(self) -> str:
if self.fname is not None:
message_part = f" in {self.fname}."
else:
assert self.error is not None
message_part = f".\n{self.error}\n"
return f"Configuration file {self.reason}{message_part}"
_DEFAULT_EXTERNALLY_MANAGED_ERROR = f"""\
The Python environment under {sys.prefix} is managed externally, and may not be
manipulated by the user. Please use specific tooling from the distributor of
the Python installation to interact with this environment instead.
"""
class ExternallyManagedEnvironment(DiagnosticPipError):
"""The current environment is externally managed.
This is raised when the current environment is externally managed, as
defined by `PEP 668`_. The ``EXTERNALLY-MANAGED`` configuration is checked
and displayed when the error is bubbled up to the user.
:param error: The error message read from ``EXTERNALLY-MANAGED``.
"""
reference = "externally-managed-environment"
def __init__(self, error: Optional[str]) -> None:
if error is None:
context = Text(_DEFAULT_EXTERNALLY_MANAGED_ERROR)
else:
context = Text(error)
super().__init__(
message="This environment is externally managed",
context=context,
note_stmt=(
"If you believe this is a mistake, please contact your "
"Python installation or OS distribution provider. "
"You can override this, at the risk of breaking your Python "
"installation or OS, by passing --break-system-packages."
),
hint_stmt=Text("See PEP 668 for the detailed specification."),
)
@staticmethod
def _iter_externally_managed_error_keys() -> Iterator[str]:
# LC_MESSAGES is in POSIX, but not the C standard. The most common
# platform that does not implement this category is Windows, where
# using other categories for console message localization is equally
# unreliable, so we fall back to the locale-less vendor message. This
# can always be re-evaluated when a vendor proposes a new alternative.
try:
category = locale.LC_MESSAGES
except AttributeError:
lang: Optional[str] = None
else:
lang, _ = locale.getlocale(category)
if lang is not None:
yield f"Error-{lang}"
for sep in ("-", "_"):
before, found, _ = lang.partition(sep)
if not found:
continue
yield f"Error-{before}"
yield "Error"
@classmethod
def from_config(
cls,
config: Union[pathlib.Path, str],
) -> "ExternallyManagedEnvironment":
parser = configparser.ConfigParser(interpolation=None)
try:
parser.read(config, encoding="utf-8")
section = parser["externally-managed"]
for key in cls._iter_externally_managed_error_keys():
with contextlib.suppress(KeyError):
return cls(section[key])
except KeyError:
pass
except (OSError, UnicodeDecodeError, configparser.ParsingError):
from pip._internal.utils._log import VERBOSE
exc_info = logger.isEnabledFor(VERBOSE)
logger.warning("Failed to read %s", config, exc_info=exc_info)
return cls(None)
"""
The main purpose of this module is to expose LinkCollector.collect_sources().
"""
import collections
import email.message
import functools
import itertools
import json
import logging
import os
import urllib.parse
import urllib.request
from html.parser import HTMLParser
from optparse import Values
from typing import (
TYPE_CHECKING,
Callable,
Dict,
Iterable,
List,
MutableMapping,
NamedTuple,
Optional,
Sequence,
Tuple,
Union,
)
from pip._vendor import requests
from pip._vendor.requests import Response
from pip._vendor.requests.exceptions import RetryError, SSLError
from pip._internal.exceptions import NetworkConnectionError
from pip._internal.models.link import Link
from pip._internal.models.search_scope import SearchScope
from pip._internal.network.session import PipSession
from pip._internal.network.utils import raise_for_status
from pip._internal.utils.filetypes import is_archive_file
from pip._internal.utils.misc import redact_auth_from_url
from pip._internal.vcs import vcs
from .sources import CandidatesFromPage, LinkSource, build_source
if TYPE_CHECKING:
from typing import Protocol
else:
Protocol = object
logger = logging.getLogger(__name__)
ResponseHeaders = MutableMapping[str, str]
def _match_vcs_scheme(url: str) -> Optional[str]:
"""Look for VCS schemes in the URL.
Returns the matched VCS scheme, or None if there's no match.
"""
for scheme in vcs.schemes:
if url.lower().startswith(scheme) and url[len(scheme)] in "+:":
return scheme
return None
class _NotAPIContent(Exception):
def __init__(self, content_type: str, request_desc: str) -> None:
super().__init__(content_type, request_desc)
self.content_type = content_type
self.request_desc = request_desc
def _ensure_api_header(response: Response) -> None:
"""
Check the Content-Type header to ensure the response contains a Simple
API Response.
Raises `_NotAPIContent` if the content type is not a valid content-type.
"""
content_type = response.headers.get("Content-Type", "Unknown")
content_type_l = content_type.lower()
if content_type_l.startswith(
(
"text/html",
"application/vnd.pypi.simple.v1+html",
"application/vnd.pypi.simple.v1+json",
)
):
return
raise _NotAPIContent(content_type, response.request.method)
class _NotHTTP(Exception):
pass
def _ensure_api_response(url: str, session: PipSession) -> None:
"""
Send a HEAD request to the URL, and ensure the response contains a simple
API Response.
Raises `_NotHTTP` if the URL is not available for a HEAD request, or
`_NotAPIContent` if the content type is not a valid content type.
"""
scheme, netloc, path, query, fragment = urllib.parse.urlsplit(url)
if scheme not in {"http", "https"}:
raise _NotHTTP()
resp = session.head(url, allow_redirects=True)
raise_for_status(resp)
_ensure_api_header(resp)
def _get_simple_response(url: str, session: PipSession) -> Response:
"""Access an Simple API response with GET, and return the response.
This consists of three parts:
1. If the URL looks suspiciously like an archive, send a HEAD first to
check the Content-Type is HTML or Simple API, to avoid downloading a
large file. Raise `_NotHTTP` if the content type cannot be determined, or
`_NotAPIContent` if it is not HTML or a Simple API.
2. Actually perform the request. Raise HTTP exceptions on network failures.
3. Check the Content-Type header to make sure we got a Simple API response,
and raise `_NotAPIContent` otherwise.
"""
if is_archive_file(Link(url).filename):
_ensure_api_response(url, session=session)
logger.debug("Getting page %s", redact_auth_from_url(url))
resp = session.get(
url,
headers={
"Accept": ", ".join(
[
"application/vnd.pypi.simple.v1+json",
"application/vnd.pypi.simple.v1+html; q=0.1",
"text/html; q=0.01",
]
),
# We don't want to blindly returned cached data for
# /simple/, because authors generally expecting that
# twine upload && pip install will function, but if
# they've done a pip install in the last ~10 minutes
# it won't. Thus by setting this to zero we will not
# blindly use any cached data, however the benefit of
# using max-age=0 instead of no-cache, is that we will
# still support conditional requests, so we will still
# minimize traffic sent in cases where the page hasn't
# changed at all, we will just always incur the round
# trip for the conditional GET now instead of only
# once per 10 minutes.
# For more information, please see pypa/pip#5670.
"Cache-Control": "max-age=0",
},
)
raise_for_status(resp)
# The check for archives above only works if the url ends with
# something that looks like an archive. However that is not a
# requirement of an url. Unless we issue a HEAD request on every
# url we cannot know ahead of time for sure if something is a
# Simple API response or not. However we can check after we've
# downloaded it.
_ensure_api_header(resp)
logger.debug(
"Fetched page %s as %s",
redact_auth_from_url(url),
resp.headers.get("Content-Type", "Unknown"),
)
return resp
def _get_encoding_from_headers(headers: ResponseHeaders) -> Optional[str]:
"""Determine if we have any encoding information in our headers."""
if headers and "Content-Type" in headers:
m = email.message.Message()
m["content-type"] = headers["Content-Type"]
charset = m.get_param("charset")
if charset:
return str(charset)
return None
class CacheablePageContent:
def __init__(self, page: "IndexContent") -> None:
assert page.cache_link_parsing
self.page = page
def __eq__(self, other: object) -> bool:
return isinstance(other, type(self)) and self.page.url == other.page.url
def __hash__(self) -> int:
return hash(self.page.url)
class ParseLinks(Protocol):
def __call__(self, page: "IndexContent") -> Iterable[Link]:
...
def with_cached_index_content(fn: ParseLinks) -> ParseLinks:
"""
Given a function that parses an Iterable[Link] from an IndexContent, cache the
function's result (keyed by CacheablePageContent), unless the IndexContent
`page` has `page.cache_link_parsing == False`.
"""
@functools.lru_cache(maxsize=None)
def wrapper(cacheable_page: CacheablePageContent) -> List[Link]:
return list(fn(cacheable_page.page))
@functools.wraps(fn)
def wrapper_wrapper(page: "IndexContent") -> List[Link]:
if page.cache_link_parsing:
return wrapper(CacheablePageContent(page))
return list(fn(page))
return wrapper_wrapper
@with_cached_index_content
def parse_links(page: "IndexContent") -> Iterable[Link]:
"""
Parse a Simple API's Index Content, and yield its anchor elements as Link objects.
"""
content_type_l = page.content_type.lower()
if content_type_l.startswith("application/vnd.pypi.simple.v1+json"):
data = json.loads(page.content)
for file in data.get("files", []):
link = Link.from_json(file, page.url)
if link is None:
continue
yield link
return
parser = HTMLLinkParser(page.url)
encoding = page.encoding or "utf-8"
parser.feed(page.content.decode(encoding))
url = page.url
base_url = parser.base_url or url
for anchor in parser.anchors:
link = Link.from_element(anchor, page_url=url, base_url=base_url)
if link is None:
continue
yield link
class IndexContent:
"""Represents one response (or page), along with its URL"""
def __init__(
self,
content: bytes,
content_type: str,
encoding: Optional[str],
url: str,
cache_link_parsing: bool = True,
) -> None:
"""
:param encoding: the encoding to decode the given content.
:param url: the URL from which the HTML was downloaded.
:param cache_link_parsing: whether links parsed from this page's url
should be cached. PyPI index urls should
have this set to False, for example.
"""
self.content = content
self.content_type = content_type
self.encoding = encoding
self.url = url
self.cache_link_parsing = cache_link_parsing
def __str__(self) -> str:
return redact_auth_from_url(self.url)
class HTMLLinkParser(HTMLParser):
"""
HTMLParser that keeps the first base HREF and a list of all anchor
elements' attributes.
"""
def __init__(self, url: str) -> None:
super().__init__(convert_charrefs=True)
self.url: str = url
self.base_url: Optional[str] = None
self.anchors: List[Dict[str, Optional[str]]] = []
def handle_starttag(self, tag: str, attrs: List[Tuple[str, Optional[str]]]) -> None:
if tag == "base" and self.base_url is None:
href = self.get_href(attrs)
if href is not None:
self.base_url = href
elif tag == "a":
self.anchors.append(dict(attrs))
def get_href(self, attrs: List[Tuple[str, Optional[str]]]) -> Optional[str]:
for name, value in attrs:
if name == "href":
return value
return None
def _handle_get_simple_fail(
link: Link,
reason: Union[str, Exception],
meth: Optional[Callable[..., None]] = None,
) -> None:
if meth is None:
meth = logger.debug
meth("Could not fetch URL %s: %s - skipping", link, reason)
def _make_index_content(
response: Response, cache_link_parsing: bool = True
) -> IndexContent:
encoding = _get_encoding_from_headers(response.headers)
return IndexContent(
response.content,
response.headers["Content-Type"],
encoding=encoding,
url=response.url,
cache_link_parsing=cache_link_parsing,
)
def _get_index_content(link: Link, *, session: PipSession) -> Optional["IndexContent"]:
url = link.url.split("#", 1)[0]
# Check for VCS schemes that do not support lookup as web pages.
vcs_scheme = _match_vcs_scheme(url)
if vcs_scheme:
logger.warning(
"Cannot look at %s URL %s because it does not support lookup as web pages.",
vcs_scheme,
link,
)
return None
# Tack index.html onto file:// URLs that point to directories
scheme, _, path, _, _, _ = urllib.parse.urlparse(url)
if scheme == "file" and os.path.isdir(urllib.request.url2pathname(path)):
# add trailing slash if not present so urljoin doesn't trim
# final segment
if not url.endswith("/"):
url += "/"
# TODO: In the future, it would be nice if pip supported PEP 691
# style responses in the file:// URLs, however there's no
# standard file extension for application/vnd.pypi.simple.v1+json
# so we'll need to come up with something on our own.
url = urllib.parse.urljoin(url, "index.html")
logger.debug(" file: URL is directory, getting %s", url)
try:
resp = _get_simple_response(url, session=session)
except _NotHTTP:
logger.warning(
"Skipping page %s because it looks like an archive, and cannot "
"be checked by a HTTP HEAD request.",
link,
)
except _NotAPIContent as exc:
logger.warning(
"Skipping page %s because the %s request got Content-Type: %s. "
"The only supported Content-Types are application/vnd.pypi.simple.v1+json, "
"application/vnd.pypi.simple.v1+html, and text/html",
link,
exc.request_desc,
exc.content_type,
)
except NetworkConnectionError as exc:
_handle_get_simple_fail(link, exc)
except RetryError as exc:
_handle_get_simple_fail(link, exc)
except SSLError as exc:
reason = "There was a problem confirming the ssl certificate: "
reason += str(exc)
_handle_get_simple_fail(link, reason, meth=logger.info)
except requests.ConnectionError as exc:
_handle_get_simple_fail(link, f"connection error: {exc}")
except requests.Timeout:
_handle_get_simple_fail(link, "timed out")
else:
return _make_index_content(resp, cache_link_parsing=link.cache_link_parsing)
return None
class CollectedSources(NamedTuple):
find_links: Sequence[Optional[LinkSource]]
index_urls: Sequence[Optional[LinkSource]]
class LinkCollector:
"""
Responsible for collecting Link objects from all configured locations,
making network requests as needed.
The class's main method is its collect_sources() method.
"""
def __init__(
self,
session: PipSession,
search_scope: SearchScope,
) -> None:
self.search_scope = search_scope
self.session = session
@classmethod
def create(
cls,
session: PipSession,
options: Values,
suppress_no_index: bool = False,
) -> "LinkCollector":
"""
:param session: The Session to use to make requests.
:param suppress_no_index: Whether to ignore the --no-index option
when constructing the SearchScope object.
"""
index_urls = [options.index_url] + options.extra_index_urls
if options.no_index and not suppress_no_index:
logger.debug(
"Ignoring indexes: %s",
",".join(redact_auth_from_url(url) for url in index_urls),
)
index_urls = []
# Make sure find_links is a list before passing to create().
find_links = options.find_links or []
search_scope = SearchScope.create(
find_links=find_links,
index_urls=index_urls,
no_index=options.no_index,
)
link_collector = LinkCollector(
session=session,
search_scope=search_scope,
)
return link_collector
@property
def find_links(self) -> List[str]:
return self.search_scope.find_links
def fetch_response(self, location: Link) -> Optional[IndexContent]:
"""
Fetch an HTML page containing package links.
"""
return _get_index_content(location, session=self.session)
def collect_sources(
self,
project_name: str,
candidates_from_page: CandidatesFromPage,
) -> CollectedSources:
# The OrderedDict calls deduplicate sources by URL.
index_url_sources = collections.OrderedDict(
build_source(
loc,
candidates_from_page=candidates_from_page,
page_validator=self.session.is_secure_origin,
expand_dir=False,
cache_link_parsing=False,
)
for loc in self.search_scope.get_index_urls_locations(project_name)
).values()
find_links_sources = collections.OrderedDict(
build_source(
loc,
candidates_from_page=candidates_from_page,
page_validator=self.session.is_secure_origin,
expand_dir=True,
cache_link_parsing=True,
)
for loc in self.find_links
).values()
if logger.isEnabledFor(logging.DEBUG):
lines = [
f"* {s.link}"
for s in itertools.chain(find_links_sources, index_url_sources)
if s is not None and s.link is not None
]
lines = [
f"{len(lines)} location(s) to search "
f"for versions of {project_name}:"
] + lines
logger.debug("\n".join(lines))
return CollectedSources(
find_links=list(find_links_sources),
index_urls=list(index_url_sources),
)
"""Routines related to PyPI, indexes"""
import enum
import functools
import itertools
import logging
import re
from typing import TYPE_CHECKING, FrozenSet, Iterable, List, Optional, Set, Tuple, Union
from pip._vendor.packaging import specifiers
from pip._vendor.packaging.tags import Tag
from pip._vendor.packaging.utils import canonicalize_name
from pip._vendor.packaging.version import _BaseVersion
from pip._vendor.packaging.version import parse as parse_version
from pip._internal.exceptions import (
BestVersionAlreadyInstalled,
DistributionNotFound,
InvalidWheelFilename,
UnsupportedWheel,
)
from pip._internal.index.collector import LinkCollector, parse_links
from pip._internal.models.candidate import InstallationCandidate
from pip._internal.models.format_control import FormatControl
from pip._internal.models.link import Link
from pip._internal.models.search_scope import SearchScope
from pip._internal.models.selection_prefs import SelectionPreferences
from pip._internal.models.target_python import TargetPython
from pip._internal.models.wheel import Wheel
from pip._internal.req import InstallRequirement
from pip._internal.utils._log import getLogger
from pip._internal.utils.filetypes import WHEEL_EXTENSION
from pip._internal.utils.hashes import Hashes
from pip._internal.utils.logging import indent_log
from pip._internal.utils.misc import build_netloc
from pip._internal.utils.packaging import check_requires_python
from pip._internal.utils.unpacking import SUPPORTED_EXTENSIONS
if TYPE_CHECKING:
from pip._vendor.typing_extensions import TypeGuard
__all__ = ["FormatControl", "BestCandidateResult", "PackageFinder"]
logger = getLogger(__name__)
BuildTag = Union[Tuple[()], Tuple[int, str]]
CandidateSortingKey = Tuple[int, int, int, _BaseVersion, Optional[int], BuildTag]
def _check_link_requires_python(
link: Link,
version_info: Tuple[int, int, int],
ignore_requires_python: bool = False,
) -> bool:
"""
Return whether the given Python version is compatible with a link's
"Requires-Python" value.
:param version_info: A 3-tuple of ints representing the Python
major-minor-micro version to check.
:param ignore_requires_python: Whether to ignore the "Requires-Python"
value if the given Python version isn't compatible.
"""
try:
is_compatible = check_requires_python(
link.requires_python,
version_info=version_info,
)
except specifiers.InvalidSpecifier:
logger.debug(
"Ignoring invalid Requires-Python (%r) for link: %s",
link.requires_python,
link,
)
else:
if not is_compatible:
version = ".".join(map(str, version_info))
if not ignore_requires_python:
logger.verbose(
"Link requires a different Python (%s not in: %r): %s",
version,
link.requires_python,
link,
)
return False
logger.debug(
"Ignoring failed Requires-Python check (%s not in: %r) for link: %s",
version,
link.requires_python,
link,
)
return True
class LinkType(enum.Enum):
candidate = enum.auto()
different_project = enum.auto()
yanked = enum.auto()
format_unsupported = enum.auto()
format_invalid = enum.auto()
platform_mismatch = enum.auto()
requires_python_mismatch = enum.auto()
class LinkEvaluator:
"""
Responsible for evaluating links for a particular project.
"""
_py_version_re = re.compile(r"-py([123]\.?[0-9]?)$")
# Don't include an allow_yanked default value to make sure each call
# site considers whether yanked releases are allowed. This also causes
# that decision to be made explicit in the calling code, which helps
# people when reading the code.
def __init__(
self,
project_name: str,
canonical_name: str,
formats: FrozenSet[str],
target_python: TargetPython,
allow_yanked: bool,
ignore_requires_python: Optional[bool] = None,
) -> None:
"""
:param project_name: The user supplied package name.
:param canonical_name: The canonical package name.
:param formats: The formats allowed for this package. Should be a set
with 'binary' or 'source' or both in it.
:param target_python: The target Python interpreter to use when
evaluating link compatibility. This is used, for example, to
check wheel compatibility, as well as when checking the Python
version, e.g. the Python version embedded in a link filename
(or egg fragment) and against an HTML link's optional PEP 503
"data-requires-python" attribute.
:param allow_yanked: Whether files marked as yanked (in the sense
of PEP 592) are permitted to be candidates for install.
:param ignore_requires_python: Whether to ignore incompatible
PEP 503 "data-requires-python" values in HTML links. Defaults
to False.
"""
if ignore_requires_python is None:
ignore_requires_python = False
self._allow_yanked = allow_yanked
self._canonical_name = canonical_name
self._ignore_requires_python = ignore_requires_python
self._formats = formats
self._target_python = target_python
self.project_name = project_name
def evaluate_link(self, link: Link) -> Tuple[LinkType, str]:
"""
Determine whether a link is a candidate for installation.
:return: A tuple (result, detail), where *result* is an enum
representing whether the evaluation found a candidate, or the reason
why one is not found. If a candidate is found, *detail* will be the
candidate's version string; if one is not found, it contains the
reason the link fails to qualify.
"""
version = None
if link.is_yanked and not self._allow_yanked:
reason = link.yanked_reason or "<none given>"
return (LinkType.yanked, f"yanked for reason: {reason}")
if link.egg_fragment:
egg_info = link.egg_fragment
ext = link.ext
else:
egg_info, ext = link.splitext()
if not ext:
return (LinkType.format_unsupported, "not a file")
if ext not in SUPPORTED_EXTENSIONS:
return (
LinkType.format_unsupported,
f"unsupported archive format: {ext}",
)
if "binary" not in self._formats and ext == WHEEL_EXTENSION:
reason = f"No binaries permitted for {self.project_name}"
return (LinkType.format_unsupported, reason)
if "macosx10" in link.path and ext == ".zip":
return (LinkType.format_unsupported, "macosx10 one")
if ext == WHEEL_EXTENSION:
try:
wheel = Wheel(link.filename)
except InvalidWheelFilename:
return (
LinkType.format_invalid,
"invalid wheel filename",
)
if canonicalize_name(wheel.name) != self._canonical_name:
reason = f"wrong project name (not {self.project_name})"
return (LinkType.different_project, reason)
supported_tags = self._target_python.get_tags()
if not wheel.supported(supported_tags):
# Include the wheel's tags in the reason string to
# simplify troubleshooting compatibility issues.
file_tags = ", ".join(wheel.get_formatted_file_tags())
reason = (
f"none of the wheel's tags ({file_tags}) are compatible "
f"(run pip debug --verbose to show compatible tags)"
)
return (LinkType.platform_mismatch, reason)
version = wheel.version
# This should be up by the self.ok_binary check, but see issue 2700.
if "source" not in self._formats and ext != WHEEL_EXTENSION:
reason = f"No sources permitted for {self.project_name}"
return (LinkType.format_unsupported, reason)
if not version:
version = _extract_version_from_fragment(
egg_info,
self._canonical_name,
)
if not version:
reason = f"Missing project version for {self.project_name}"
return (LinkType.format_invalid, reason)
match = self._py_version_re.search(version)
if match:
version = version[: match.start()]
py_version = match.group(1)
if py_version != self._target_python.py_version:
return (
LinkType.platform_mismatch,
"Python version is incorrect",
)
supports_python = _check_link_requires_python(
link,
version_info=self._target_python.py_version_info,
ignore_requires_python=self._ignore_requires_python,
)
if not supports_python:
reason = f"{version} Requires-Python {link.requires_python}"
return (LinkType.requires_python_mismatch, reason)
logger.debug("Found link %s, version: %s", link, version)
return (LinkType.candidate, version)
def filter_unallowed_hashes(
candidates: List[InstallationCandidate],
hashes: Optional[Hashes],
project_name: str,
) -> List[InstallationCandidate]:
"""
Filter out candidates whose hashes aren't allowed, and return a new
list of candidates.
If at least one candidate has an allowed hash, then all candidates with
either an allowed hash or no hash specified are returned. Otherwise,
the given candidates are returned.
Including the candidates with no hash specified when there is a match
allows a warning to be logged if there is a more preferred candidate
with no hash specified. Returning all candidates in the case of no
matches lets pip report the hash of the candidate that would otherwise
have been installed (e.g. permitting the user to more easily update
their requirements file with the desired hash).
"""
if not hashes:
logger.debug(
"Given no hashes to check %s links for project %r: "
"discarding no candidates",
len(candidates),
project_name,
)
# Make sure we're not returning back the given value.
return list(candidates)
matches_or_no_digest = []
# Collect the non-matches for logging purposes.
non_matches = []
match_count = 0
for candidate in candidates:
link = candidate.link
if not link.has_hash:
pass
elif link.is_hash_allowed(hashes=hashes):
match_count += 1
else:
non_matches.append(candidate)
continue
matches_or_no_digest.append(candidate)
if match_count:
filtered = matches_or_no_digest
else:
# Make sure we're not returning back the given value.
filtered = list(candidates)
if len(filtered) == len(candidates):
discard_message = "discarding no candidates"
else:
discard_message = "discarding {} non-matches:\n {}".format(
len(non_matches),
"\n ".join(str(candidate.link) for candidate in non_matches),
)
logger.debug(
"Checked %s links for project %r against %s hashes "
"(%s matches, %s no digest): %s",
len(candidates),
project_name,
hashes.digest_count,
match_count,
len(matches_or_no_digest) - match_count,
discard_message,
)
return filtered
class CandidatePreferences:
"""
Encapsulates some of the preferences for filtering and sorting
InstallationCandidate objects.
"""
def __init__(
self,
prefer_binary: bool = False,
allow_all_prereleases: bool = False,
) -> None:
"""
:param allow_all_prereleases: Whether to allow all pre-releases.
"""
self.allow_all_prereleases = allow_all_prereleases
self.prefer_binary = prefer_binary
class BestCandidateResult:
"""A collection of candidates, returned by `PackageFinder.find_best_candidate`.
This class is only intended to be instantiated by CandidateEvaluator's
`compute_best_candidate()` method.
"""
def __init__(
self,
candidates: List[InstallationCandidate],
applicable_candidates: List[InstallationCandidate],
best_candidate: Optional[InstallationCandidate],
) -> None:
"""
:param candidates: A sequence of all available candidates found.
:param applicable_candidates: The applicable candidates.
:param best_candidate: The most preferred candidate found, or None
if no applicable candidates were found.
"""
assert set(applicable_candidates) <= set(candidates)
if best_candidate is None:
assert not applicable_candidates
else:
assert best_candidate in applicable_candidates
self._applicable_candidates = applicable_candidates
self._candidates = candidates
self.best_candidate = best_candidate
def iter_all(self) -> Iterable[InstallationCandidate]:
"""Iterate through all candidates."""
return iter(self._candidates)
def iter_applicable(self) -> Iterable[InstallationCandidate]:
"""Iterate through the applicable candidates."""
return iter(self._applicable_candidates)
class CandidateEvaluator:
"""
Responsible for filtering and sorting candidates for installation based
on what tags are valid.
"""
@classmethod
def create(
cls,
project_name: str,
target_python: Optional[TargetPython] = None,
prefer_binary: bool = False,
allow_all_prereleases: bool = False,
specifier: Optional[specifiers.BaseSpecifier] = None,
hashes: Optional[Hashes] = None,
) -> "CandidateEvaluator":
"""Create a CandidateEvaluator object.
:param target_python: The target Python interpreter to use when
checking compatibility. If None (the default), a TargetPython
object will be constructed from the running Python.
:param specifier: An optional object implementing `filter`
(e.g. `packaging.specifiers.SpecifierSet`) to filter applicable
versions.
:param hashes: An optional collection of allowed hashes.
"""
if target_python is None:
target_python = TargetPython()
if specifier is None:
specifier = specifiers.SpecifierSet()
supported_tags = target_python.get_tags()
return cls(
project_name=project_name,
supported_tags=supported_tags,
specifier=specifier,
prefer_binary=prefer_binary,
allow_all_prereleases=allow_all_prereleases,
hashes=hashes,
)
def __init__(
self,
project_name: str,
supported_tags: List[Tag],
specifier: specifiers.BaseSpecifier,
prefer_binary: bool = False,
allow_all_prereleases: bool = False,
hashes: Optional[Hashes] = None,
) -> None:
"""
:param supported_tags: The PEP 425 tags supported by the target
Python in order of preference (most preferred first).
"""
self._allow_all_prereleases = allow_all_prereleases
self._hashes = hashes
self._prefer_binary = prefer_binary
self._project_name = project_name
self._specifier = specifier
self._supported_tags = supported_tags
# Since the index of the tag in the _supported_tags list is used
# as a priority, precompute a map from tag to index/priority to be
# used in wheel.find_most_preferred_tag.
self._wheel_tag_preferences = {
tag: idx for idx, tag in enumerate(supported_tags)
}
def get_applicable_candidates(
self,
candidates: List[InstallationCandidate],
) -> List[InstallationCandidate]:
"""
Return the applicable candidates from a list of candidates.
"""
# Using None infers from the specifier instead.
allow_prereleases = self._allow_all_prereleases or None
specifier = self._specifier
versions = {
str(v)
for v in specifier.filter(
# We turn the version object into a str here because otherwise
# when we're debundled but setuptools isn't, Python will see
# packaging.version.Version and
# pkg_resources._vendor.packaging.version.Version as different
# types. This way we'll use a str as a common data interchange
# format. If we stop using the pkg_resources provided specifier
# and start using our own, we can drop the cast to str().
(str(c.version) for c in candidates),
prereleases=allow_prereleases,
)
}
# Again, converting version to str to deal with debundling.
applicable_candidates = [c for c in candidates if str(c.version) in versions]
filtered_applicable_candidates = filter_unallowed_hashes(
candidates=applicable_candidates,
hashes=self._hashes,
project_name=self._project_name,
)
return sorted(filtered_applicable_candidates, key=self._sort_key)
def _sort_key(self, candidate: InstallationCandidate) -> CandidateSortingKey:
"""
Function to pass as the `key` argument to a call to sorted() to sort
InstallationCandidates by preference.
Returns a tuple such that tuples sorting as greater using Python's
default comparison operator are more preferred.
The preference is as follows:
First and foremost, candidates with allowed (matching) hashes are
always preferred over candidates without matching hashes. This is
because e.g. if the only candidate with an allowed hash is yanked,
we still want to use that candidate.
Second, excepting hash considerations, candidates that have been
yanked (in the sense of PEP 592) are always less preferred than
candidates that haven't been yanked. Then:
If not finding wheels, they are sorted by version only.
If finding wheels, then the sort order is by version, then:
1. existing installs
2. wheels ordered via Wheel.support_index_min(self._supported_tags)
3. source archives
If prefer_binary was set, then all wheels are sorted above sources.
Note: it was considered to embed this logic into the Link
comparison operators, but then different sdist links
with the same version, would have to be considered equal
"""
valid_tags = self._supported_tags
support_num = len(valid_tags)
build_tag: BuildTag = ()
binary_preference = 0
link = candidate.link
if link.is_wheel:
# can raise InvalidWheelFilename
wheel = Wheel(link.filename)
try:
pri = -(
wheel.find_most_preferred_tag(
valid_tags, self._wheel_tag_preferences
)
)
except ValueError:
raise UnsupportedWheel(
"{} is not a supported wheel for this platform. It "
"can't be sorted.".format(wheel.filename)
)
if self._prefer_binary:
binary_preference = 1
if wheel.build_tag is not None:
match = re.match(r"^(\d+)(.*)$", wheel.build_tag)
assert match is not None, "guaranteed by filename validation"
build_tag_groups = match.groups()
build_tag = (int(build_tag_groups[0]), build_tag_groups[1])
else: # sdist
pri = -(support_num)
has_allowed_hash = int(link.is_hash_allowed(self._hashes))
yank_value = -1 * int(link.is_yanked) # -1 for yanked.
return (
has_allowed_hash,
yank_value,
binary_preference,
candidate.version,
pri,
build_tag,
)
def sort_best_candidate(
self,
candidates: List[InstallationCandidate],
) -> Optional[InstallationCandidate]:
"""
Return the best candidate per the instance's sort order, or None if
no candidate is acceptable.
"""
if not candidates:
return None
best_candidate = max(candidates, key=self._sort_key)
return best_candidate
def compute_best_candidate(
self,
candidates: List[InstallationCandidate],
) -> BestCandidateResult:
"""
Compute and return a `BestCandidateResult` instance.
"""
applicable_candidates = self.get_applicable_candidates(candidates)
best_candidate = self.sort_best_candidate(applicable_candidates)
return BestCandidateResult(
candidates,
applicable_candidates=applicable_candidates,
best_candidate=best_candidate,
)
class PackageFinder:
"""This finds packages.
This is meant to match easy_install's technique for looking for
packages, by reading pages and looking for appropriate links.
"""
def __init__(
self,
link_collector: LinkCollector,
target_python: TargetPython,
allow_yanked: bool,
format_control: Optional[FormatControl] = None,
candidate_prefs: Optional[CandidatePreferences] = None,
ignore_requires_python: Optional[bool] = None,
) -> None:
"""
This constructor is primarily meant to be used by the create() class
method and from tests.
:param format_control: A FormatControl object, used to control
the selection of source packages / binary packages when consulting
the index and links.
:param candidate_prefs: Options to use when creating a
CandidateEvaluator object.
"""
if candidate_prefs is None:
candidate_prefs = CandidatePreferences()
format_control = format_control or FormatControl(set(), set())
self._allow_yanked = allow_yanked
self._candidate_prefs = candidate_prefs
self._ignore_requires_python = ignore_requires_python
self._link_collector = link_collector
self._target_python = target_python
self.format_control = format_control
# These are boring links that have already been logged somehow.
self._logged_links: Set[Tuple[Link, LinkType, str]] = set()
# Don't include an allow_yanked default value to make sure each call
# site considers whether yanked releases are allowed. This also causes
# that decision to be made explicit in the calling code, which helps
# people when reading the code.
@classmethod
def create(
cls,
link_collector: LinkCollector,
selection_prefs: SelectionPreferences,
target_python: Optional[TargetPython] = None,
) -> "PackageFinder":
"""Create a PackageFinder.
:param selection_prefs: The candidate selection preferences, as a
SelectionPreferences object.
:param target_python: The target Python interpreter to use when
checking compatibility. If None (the default), a TargetPython
object will be constructed from the running Python.
"""
if target_python is None:
target_python = TargetPython()
candidate_prefs = CandidatePreferences(
prefer_binary=selection_prefs.prefer_binary,
allow_all_prereleases=selection_prefs.allow_all_prereleases,
)
return cls(
candidate_prefs=candidate_prefs,
link_collector=link_collector,
target_python=target_python,
allow_yanked=selection_prefs.allow_yanked,
format_control=selection_prefs.format_control,
ignore_requires_python=selection_prefs.ignore_requires_python,
)
@property
def target_python(self) -> TargetPython:
return self._target_python
@property
def search_scope(self) -> SearchScope:
return self._link_collector.search_scope
@search_scope.setter
def search_scope(self, search_scope: SearchScope) -> None:
self._link_collector.search_scope = search_scope
@property
def find_links(self) -> List[str]:
return self._link_collector.find_links
@property
def index_urls(self) -> List[str]:
return self.search_scope.index_urls
@property
def trusted_hosts(self) -> Iterable[str]:
for host_port in self._link_collector.session.pip_trusted_origins:
yield build_netloc(*host_port)
@property
def allow_all_prereleases(self) -> bool:
return self._candidate_prefs.allow_all_prereleases
def set_allow_all_prereleases(self) -> None:
self._candidate_prefs.allow_all_prereleases = True
@property
def prefer_binary(self) -> bool:
return self._candidate_prefs.prefer_binary
def set_prefer_binary(self) -> None:
self._candidate_prefs.prefer_binary = True
def requires_python_skipped_reasons(self) -> List[str]:
reasons = {
detail
for _, result, detail in self._logged_links
if result == LinkType.requires_python_mismatch
}
return sorted(reasons)
def make_link_evaluator(self, project_name: str) -> LinkEvaluator:
canonical_name = canonicalize_name(project_name)
formats = self.format_control.get_allowed_formats(canonical_name)
return LinkEvaluator(
project_name=project_name,
canonical_name=canonical_name,
formats=formats,
target_python=self._target_python,
allow_yanked=self._allow_yanked,
ignore_requires_python=self._ignore_requires_python,
)
def _sort_links(self, links: Iterable[Link]) -> List[Link]:
"""
Returns elements of links in order, non-egg links first, egg links
second, while eliminating duplicates
"""
eggs, no_eggs = [], []
seen: Set[Link] = set()
for link in links:
if link not in seen:
seen.add(link)
if link.egg_fragment:
eggs.append(link)
else:
no_eggs.append(link)
return no_eggs + eggs
def _log_skipped_link(self, link: Link, result: LinkType, detail: str) -> None:
entry = (link, result, detail)
if entry not in self._logged_links:
# Put the link at the end so the reason is more visible and because
# the link string is usually very long.
logger.debug("Skipping link: %s: %s", detail, link)
self._logged_links.add(entry)
def get_install_candidate(
self, link_evaluator: LinkEvaluator, link: Link
) -> Optional[InstallationCandidate]:
"""
If the link is a candidate for install, convert it to an
InstallationCandidate and return it. Otherwise, return None.
"""
result, detail = link_evaluator.evaluate_link(link)
if result != LinkType.candidate:
self._log_skipped_link(link, result, detail)
return None
return InstallationCandidate(
name=link_evaluator.project_name,
link=link,
version=detail,
)
def evaluate_links(
self, link_evaluator: LinkEvaluator, links: Iterable[Link]
) -> List[InstallationCandidate]:
"""
Convert links that are candidates to InstallationCandidate objects.
"""
candidates = []
for link in self._sort_links(links):
candidate = self.get_install_candidate(link_evaluator, link)
if candidate is not None:
candidates.append(candidate)
return candidates
def process_project_url(
self, project_url: Link, link_evaluator: LinkEvaluator
) -> List[InstallationCandidate]:
logger.debug(
"Fetching project page and analyzing links: %s",
project_url,
)
index_response = self._link_collector.fetch_response(project_url)
if index_response is None:
return []
page_links = list(parse_links(index_response))
with indent_log():
package_links = self.evaluate_links(
link_evaluator,
links=page_links,
)
return package_links
@functools.lru_cache(maxsize=None)
def find_all_candidates(self, project_name: str) -> List[InstallationCandidate]:
"""Find all available InstallationCandidate for project_name
This checks index_urls and find_links.
All versions found are returned as an InstallationCandidate list.
See LinkEvaluator.evaluate_link() for details on which files
are accepted.
"""
link_evaluator = self.make_link_evaluator(project_name)
collected_sources = self._link_collector.collect_sources(
project_name=project_name,
candidates_from_page=functools.partial(
self.process_project_url,
link_evaluator=link_evaluator,
),
)
page_candidates_it = itertools.chain.from_iterable(
source.page_candidates()
for sources in collected_sources
for source in sources
if source is not None
)
page_candidates = list(page_candidates_it)
file_links_it = itertools.chain.from_iterable(
source.file_links()
for sources in collected_sources
for source in sources
if source is not None
)
file_candidates = self.evaluate_links(
link_evaluator,
sorted(file_links_it, reverse=True),
)
if logger.isEnabledFor(logging.DEBUG) and file_candidates:
paths = []
for candidate in file_candidates:
assert candidate.link.url # we need to have a URL
try:
paths.append(candidate.link.file_path)
except Exception:
paths.append(candidate.link.url) # it's not a local file
logger.debug("Local files found: %s", ", ".join(paths))
# This is an intentional priority ordering
return file_candidates + page_candidates
def make_candidate_evaluator(
self,
project_name: str,
specifier: Optional[specifiers.BaseSpecifier] = None,
hashes: Optional[Hashes] = None,
) -> CandidateEvaluator:
"""Create a CandidateEvaluator object to use."""
candidate_prefs = self._candidate_prefs
return CandidateEvaluator.create(
project_name=project_name,
target_python=self._target_python,
prefer_binary=candidate_prefs.prefer_binary,
allow_all_prereleases=candidate_prefs.allow_all_prereleases,
specifier=specifier,
hashes=hashes,
)
@functools.lru_cache(maxsize=None)
def find_best_candidate(
self,
project_name: str,
specifier: Optional[specifiers.BaseSpecifier] = None,
hashes: Optional[Hashes] = None,
) -> BestCandidateResult:
"""Find matches for the given project and specifier.
:param specifier: An optional object implementing `filter`
(e.g. `packaging.specifiers.SpecifierSet`) to filter applicable
versions.
:return: A `BestCandidateResult` instance.
"""
candidates = self.find_all_candidates(project_name)
candidate_evaluator = self.make_candidate_evaluator(
project_name=project_name,
specifier=specifier,
hashes=hashes,
)
return candidate_evaluator.compute_best_candidate(candidates)
def find_requirement(
self, req: InstallRequirement, upgrade: bool
) -> Optional[InstallationCandidate]:
"""Try to find a Link matching req
Expects req, an InstallRequirement and upgrade, a boolean
Returns a InstallationCandidate if found,
Raises DistributionNotFound or BestVersionAlreadyInstalled otherwise
"""
hashes = req.hashes(trust_internet=False)
best_candidate_result = self.find_best_candidate(
req.name,
specifier=req.specifier,
hashes=hashes,
)
best_candidate = best_candidate_result.best_candidate
installed_version: Optional[_BaseVersion] = None
if req.satisfied_by is not None:
installed_version = req.satisfied_by.version
def _format_versions(cand_iter: Iterable[InstallationCandidate]) -> str:
# This repeated parse_version and str() conversion is needed to
# handle different vendoring sources from pip and pkg_resources.
# If we stop using the pkg_resources provided specifier and start
# using our own, we can drop the cast to str().
return (
", ".join(
sorted(
{str(c.version) for c in cand_iter},
key=parse_version,
)
)
or "none"
)
if installed_version is None and best_candidate is None:
logger.critical(
"Could not find a version that satisfies the requirement %s "
"(from versions: %s)",
req,
_format_versions(best_candidate_result.iter_all()),
)
raise DistributionNotFound(
"No matching distribution found for {}".format(req)
)
def _should_install_candidate(
candidate: Optional[InstallationCandidate],
) -> "TypeGuard[InstallationCandidate]":
if installed_version is None:
return True
if best_candidate is None:
return False
return best_candidate.version > installed_version
if not upgrade and installed_version is not None:
if _should_install_candidate(best_candidate):
logger.debug(
"Existing installed version (%s) satisfies requirement "
"(most up-to-date version is %s)",
installed_version,
best_candidate.version,
)
else:
logger.debug(
"Existing installed version (%s) is most up-to-date and "
"satisfies requirement",
installed_version,
)
return None
if _should_install_candidate(best_candidate):
logger.debug(
"Using version %s (newest of versions: %s)",
best_candidate.version,
_format_versions(best_candidate_result.iter_applicable()),
)
return best_candidate
# We have an existing version, and its the best version
logger.debug(
"Installed version (%s) is most up-to-date (past versions: %s)",
installed_version,
_format_versions(best_candidate_result.iter_applicable()),
)
raise BestVersionAlreadyInstalled
def _find_name_version_sep(fragment: str, canonical_name: str) -> int:
"""Find the separator's index based on the package's canonical name.
:param fragment: A <package>+<version> filename "fragment" (stem) or
egg fragment.
:param canonical_name: The package's canonical name.
This function is needed since the canonicalized name does not necessarily
have the same length as the egg info's name part. An example::
>>> fragment = 'foo__bar-1.0'
>>> canonical_name = 'foo-bar'
>>> _find_name_version_sep(fragment, canonical_name)
8
"""
# Project name and version must be separated by one single dash. Find all
# occurrences of dashes; if the string in front of it matches the canonical
# name, this is the one separating the name and version parts.
for i, c in enumerate(fragment):
if c != "-":
continue
if canonicalize_name(fragment[:i]) == canonical_name:
return i
raise ValueError(f"{fragment} does not match {canonical_name}")
def _extract_version_from_fragment(fragment: str, canonical_name: str) -> Optional[str]:
"""Parse the version string from a <package>+<version> filename
"fragment" (stem) or egg fragment.
:param fragment: The string to parse. E.g. foo-2.1
:param canonical_name: The canonicalized name of the package this
belongs to.
"""
try:
version_start = _find_name_version_sep(fragment, canonical_name) + 1
except ValueError:
return None
version = fragment[version_start:]
if not version:
return None
return version
import logging
import mimetypes
import os
import pathlib
from typing import Callable, Iterable, Optional, Tuple
from pip._internal.models.candidate import InstallationCandidate
from pip._internal.models.link import Link
from pip._internal.utils.urls import path_to_url, url_to_path
from pip._internal.vcs import is_url
logger = logging.getLogger(__name__)
FoundCandidates = Iterable[InstallationCandidate]
FoundLinks = Iterable[Link]
CandidatesFromPage = Callable[[Link], Iterable[InstallationCandidate]]
PageValidator = Callable[[Link], bool]
class LinkSource:
@property
def link(self) -> Optional[Link]:
"""Returns the underlying link, if there's one."""
raise NotImplementedError()
def page_candidates(self) -> FoundCandidates:
"""Candidates found by parsing an archive listing HTML file."""
raise NotImplementedError()
def file_links(self) -> FoundLinks:
"""Links found by specifying archives directly."""
raise NotImplementedError()
def _is_html_file(file_url: str) -> bool:
return mimetypes.guess_type(file_url, strict=False)[0] == "text/html"
class _FlatDirectorySource(LinkSource):
"""Link source specified by ``--find-links=<path-to-dir>``.
This looks the content of the directory, and returns:
* ``page_candidates``: Links listed on each HTML file in the directory.
* ``file_candidates``: Archives in the directory.
"""
def __init__(
self,
candidates_from_page: CandidatesFromPage,
path: str,
) -> None:
self._candidates_from_page = candidates_from_page
self._path = pathlib.Path(os.path.realpath(path))
@property
def link(self) -> Optional[Link]:
return None
def page_candidates(self) -> FoundCandidates:
for path in self._path.iterdir():
url = path_to_url(str(path))
if not _is_html_file(url):
continue
yield from self._candidates_from_page(Link(url))
def file_links(self) -> FoundLinks:
for path in self._path.iterdir():
url = path_to_url(str(path))
if _is_html_file(url):
continue
yield Link(url)
class _LocalFileSource(LinkSource):
"""``--find-links=<path-or-url>`` or ``--[extra-]index-url=<path-or-url>``.
If a URL is supplied, it must be a ``file:`` URL. If a path is supplied to
the option, it is converted to a URL first. This returns:
* ``page_candidates``: Links listed on an HTML file.
* ``file_candidates``: The non-HTML file.
"""
def __init__(
self,
candidates_from_page: CandidatesFromPage,
link: Link,
) -> None:
self._candidates_from_page = candidates_from_page
self._link = link
@property
def link(self) -> Optional[Link]:
return self._link
def page_candidates(self) -> FoundCandidates:
if not _is_html_file(self._link.url):
return
yield from self._candidates_from_page(self._link)
def file_links(self) -> FoundLinks:
if _is_html_file(self._link.url):
return
yield self._link
class _RemoteFileSource(LinkSource):
"""``--find-links=<url>`` or ``--[extra-]index-url=<url>``.
This returns:
* ``page_candidates``: Links listed on an HTML file.
* ``file_candidates``: The non-HTML file.
"""
def __init__(
self,
candidates_from_page: CandidatesFromPage,
page_validator: PageValidator,
link: Link,
) -> None:
self._candidates_from_page = candidates_from_page
self._page_validator = page_validator
self._link = link
@property
def link(self) -> Optional[Link]:
return self._link
def page_candidates(self) -> FoundCandidates:
if not self._page_validator(self._link):
return
yield from self._candidates_from_page(self._link)
def file_links(self) -> FoundLinks:
yield self._link
class _IndexDirectorySource(LinkSource):
"""``--[extra-]index-url=<path-to-directory>``.
This is treated like a remote URL; ``candidates_from_page`` contains logic
for this by appending ``index.html`` to the link.
"""
def __init__(
self,
candidates_from_page: CandidatesFromPage,
link: Link,
) -> None:
self._candidates_from_page = candidates_from_page
self._link = link
@property
def link(self) -> Optional[Link]:
return self._link
def page_candidates(self) -> FoundCandidates:
yield from self._candidates_from_page(self._link)
def file_links(self) -> FoundLinks:
return ()
def build_source(
location: str,
*,
candidates_from_page: CandidatesFromPage,
page_validator: PageValidator,
expand_dir: bool,
cache_link_parsing: bool,
) -> Tuple[Optional[str], Optional[LinkSource]]:
path: Optional[str] = None
url: Optional[str] = None
if os.path.exists(location): # Is a local path.
url = path_to_url(location)
path = location
elif location.startswith("file:"): # A file: URL.
url = location
path = url_to_path(location)
elif is_url(location):
url = location
if url is None:
msg = (
"Location '%s' is ignored: "
"it is either a non-existing path or lacks a specific scheme."
)
logger.warning(msg, location)
return (None, None)
if path is None:
source: LinkSource = _RemoteFileSource(
candidates_from_page=candidates_from_page,
page_validator=page_validator,
link=Link(url, cache_link_parsing=cache_link_parsing),
)
return (url, source)
if os.path.isdir(path):
if expand_dir:
source = _FlatDirectorySource(
candidates_from_page=candidates_from_page,
path=path,
)
else:
source = _IndexDirectorySource(
candidates_from_page=candidates_from_page,
link=Link(url, cache_link_parsing=cache_link_parsing),
)
return (url, source)
elif os.path.isfile(path):
source = _LocalFileSource(
candidates_from_page=candidates_from_page,
link=Link(url, cache_link_parsing=cache_link_parsing),
)
return (url, source)
logger.warning(
"Location '%s' is ignored: it is neither a file nor a directory.",
location,
)
return (url, None)
import functools
import logging
import os
import pathlib
import sys
import sysconfig
from typing import Any, Dict, Generator, Optional, Tuple
from pip._internal.models.scheme import SCHEME_KEYS, Scheme
from pip._internal.utils.compat import WINDOWS
from pip._internal.utils.deprecation import deprecated
from pip._internal.utils.virtualenv import running_under_virtualenv
from . import _sysconfig
from .base import (
USER_CACHE_DIR,
get_major_minor_version,
get_src_prefix,
is_osx_framework,
site_packages,
user_site,
)
__all__ = [
"USER_CACHE_DIR",
"get_bin_prefix",
"get_bin_user",
"get_major_minor_version",
"get_platlib",
"get_purelib",
"get_scheme",
"get_src_prefix",
"site_packages",
"user_site",
]
logger = logging.getLogger(__name__)
_PLATLIBDIR: str = getattr(sys, "platlibdir", "lib")
_USE_SYSCONFIG_DEFAULT = sys.version_info >= (3, 10)
def _should_use_sysconfig() -> bool:
"""This function determines the value of _USE_SYSCONFIG.
By default, pip uses sysconfig on Python 3.10+.
But Python distributors can override this decision by setting:
sysconfig._PIP_USE_SYSCONFIG = True / False
Rationale in https://github.com/pypa/pip/issues/10647
This is a function for testability, but should be constant during any one
run.
"""
return bool(getattr(sysconfig, "_PIP_USE_SYSCONFIG", _USE_SYSCONFIG_DEFAULT))
_USE_SYSCONFIG = _should_use_sysconfig()
if not _USE_SYSCONFIG:
# Import distutils lazily to avoid deprecation warnings,
# but import it soon enough that it is in memory and available during
# a pip reinstall.
from . import _distutils
# Be noisy about incompatibilities if this platforms "should" be using
# sysconfig, but is explicitly opting out and using distutils instead.
if _USE_SYSCONFIG_DEFAULT and not _USE_SYSCONFIG:
_MISMATCH_LEVEL = logging.WARNING
else:
_MISMATCH_LEVEL = logging.DEBUG
def _looks_like_bpo_44860() -> bool:
"""The resolution to bpo-44860 will change this incorrect platlib.
See <https://bugs.python.org/issue44860>.
"""
from distutils.command.install import INSTALL_SCHEMES
try:
unix_user_platlib = INSTALL_SCHEMES["unix_user"]["platlib"]
except KeyError:
return False
return unix_user_platlib == "$usersite"
def _looks_like_red_hat_patched_platlib_purelib(scheme: Dict[str, str]) -> bool:
platlib = scheme["platlib"]
if "/$platlibdir/" in platlib:
platlib = platlib.replace("/$platlibdir/", f"/{_PLATLIBDIR}/")
if "/lib64/" not in platlib:
return False
unpatched = platlib.replace("/lib64/", "/lib/")
return unpatched.replace("$platbase/", "$base/") == scheme["purelib"]
@functools.lru_cache(maxsize=None)
def _looks_like_red_hat_lib() -> bool:
"""Red Hat patches platlib in unix_prefix and unix_home, but not purelib.
This is the only way I can see to tell a Red Hat-patched Python.
"""
from distutils.command.install import INSTALL_SCHEMES
return all(
k in INSTALL_SCHEMES
and _looks_like_red_hat_patched_platlib_purelib(INSTALL_SCHEMES[k])
for k in ("unix_prefix", "unix_home")
)
@functools.lru_cache(maxsize=None)
def _looks_like_debian_scheme() -> bool:
"""Debian adds two additional schemes."""
from distutils.command.install import INSTALL_SCHEMES
return "deb_system" in INSTALL_SCHEMES and "unix_local" in INSTALL_SCHEMES
@functools.lru_cache(maxsize=None)
def _looks_like_red_hat_scheme() -> bool:
"""Red Hat patches ``sys.prefix`` and ``sys.exec_prefix``.
Red Hat's ``00251-change-user-install-location.patch`` changes the install
command's ``prefix`` and ``exec_prefix`` to append ``"/local"``. This is
(fortunately?) done quite unconditionally, so we create a default command
object without any configuration to detect this.
"""
from distutils.command.install import install
from distutils.dist import Distribution
cmd: Any = install(Distribution())
cmd.finalize_options()
return (
cmd.exec_prefix == f"{os.path.normpath(sys.exec_prefix)}/local"
and cmd.prefix == f"{os.path.normpath(sys.prefix)}/local"
)
@functools.lru_cache(maxsize=None)
def _looks_like_slackware_scheme() -> bool:
"""Slackware patches sysconfig but fails to patch distutils and site.
Slackware changes sysconfig's user scheme to use ``"lib64"`` for the lib
path, but does not do the same to the site module.
"""
if user_site is None: # User-site not available.
return False
try:
paths = sysconfig.get_paths(scheme="posix_user", expand=False)
except KeyError: # User-site not available.
return False
return "/lib64/" in paths["purelib"] and "/lib64/" not in user_site
@functools.lru_cache(maxsize=None)
def _looks_like_msys2_mingw_scheme() -> bool:
"""MSYS2 patches distutils and sysconfig to use a UNIX-like scheme.
However, MSYS2 incorrectly patches sysconfig ``nt`` scheme. The fix is
likely going to be included in their 3.10 release, so we ignore the warning.
See msys2/MINGW-packages#9319.
MSYS2 MINGW's patch uses lowercase ``"lib"`` instead of the usual uppercase,
and is missing the final ``"site-packages"``.
"""
paths = sysconfig.get_paths("nt", expand=False)
return all(
"Lib" not in p and "lib" in p and not p.endswith("site-packages")
for p in (paths[key] for key in ("platlib", "purelib"))
)
def _fix_abiflags(parts: Tuple[str]) -> Generator[str, None, None]:
ldversion = sysconfig.get_config_var("LDVERSION")
abiflags = getattr(sys, "abiflags", None)
# LDVERSION does not end with sys.abiflags. Just return the path unchanged.
if not ldversion or not abiflags or not ldversion.endswith(abiflags):
yield from parts
return
# Strip sys.abiflags from LDVERSION-based path components.
for part in parts:
if part.endswith(ldversion):
part = part[: (0 - len(abiflags))]
yield part
@functools.lru_cache(maxsize=None)
def _warn_mismatched(old: pathlib.Path, new: pathlib.Path, *, key: str) -> None:
issue_url = "https://github.com/pypa/pip/issues/10151"
message = (
"Value for %s does not match. Please report this to <%s>"
"\ndistutils: %s"
"\nsysconfig: %s"
)
logger.log(_MISMATCH_LEVEL, message, key, issue_url, old, new)
def _warn_if_mismatch(old: pathlib.Path, new: pathlib.Path, *, key: str) -> bool:
if old == new:
return False
_warn_mismatched(old, new, key=key)
return True
@functools.lru_cache(maxsize=None)
def _log_context(
*,
user: bool = False,
home: Optional[str] = None,
root: Optional[str] = None,
prefix: Optional[str] = None,
) -> None:
parts = [
"Additional context:",
"user = %r",
"home = %r",
"root = %r",
"prefix = %r",
]
logger.log(_MISMATCH_LEVEL, "\n".join(parts), user, home, root, prefix)
def get_scheme(
dist_name: str,
user: bool = False,
home: Optional[str] = None,
root: Optional[str] = None,
isolated: bool = False,
prefix: Optional[str] = None,
) -> Scheme:
new = _sysconfig.get_scheme(
dist_name,
user=user,
home=home,
root=root,
isolated=isolated,
prefix=prefix,
)
if _USE_SYSCONFIG:
return new
old = _distutils.get_scheme(
dist_name,
user=user,
home=home,
root=root,
isolated=isolated,
prefix=prefix,
)
warning_contexts = []
for k in SCHEME_KEYS:
old_v = pathlib.Path(getattr(old, k))
new_v = pathlib.Path(getattr(new, k))
if old_v == new_v:
continue
# distutils incorrectly put PyPy packages under ``site-packages/python``
# in the ``posix_home`` scheme, but PyPy devs said they expect the
# directory name to be ``pypy`` instead. So we treat this as a bug fix
# and not warn about it. See bpo-43307 and python/cpython#24628.
skip_pypy_special_case = (
sys.implementation.name == "pypy"
and home is not None
and k in ("platlib", "purelib")
and old_v.parent == new_v.parent
and old_v.name.startswith("python")
and new_v.name.startswith("pypy")
)
if skip_pypy_special_case:
continue
# sysconfig's ``osx_framework_user`` does not include ``pythonX.Y`` in
# the ``include`` value, but distutils's ``headers`` does. We'll let
# CPython decide whether this is a bug or feature. See bpo-43948.
skip_osx_framework_user_special_case = (
user
and is_osx_framework()
and k == "headers"
and old_v.parent.parent == new_v.parent
and old_v.parent.name.startswith("python")
)
if skip_osx_framework_user_special_case:
continue
# On Red Hat and derived Linux distributions, distutils is patched to
# use "lib64" instead of "lib" for platlib.
if k == "platlib" and _looks_like_red_hat_lib():
continue
# On Python 3.9+, sysconfig's posix_user scheme sets platlib against
# sys.platlibdir, but distutils's unix_user incorrectly coninutes
# using the same $usersite for both platlib and purelib. This creates a
# mismatch when sys.platlibdir is not "lib".
skip_bpo_44860 = (
user
and k == "platlib"
and not WINDOWS
and sys.version_info >= (3, 9)
and _PLATLIBDIR != "lib"
and _looks_like_bpo_44860()
)
if skip_bpo_44860:
continue
# Slackware incorrectly patches posix_user to use lib64 instead of lib,
# but not usersite to match the location.
skip_slackware_user_scheme = (
user
and k in ("platlib", "purelib")
and not WINDOWS
and _looks_like_slackware_scheme()
)
if skip_slackware_user_scheme:
continue
# Both Debian and Red Hat patch Python to place the system site under
# /usr/local instead of /usr. Debian also places lib in dist-packages
# instead of site-packages, but the /usr/local check should cover it.
skip_linux_system_special_case = (
not (user or home or prefix or running_under_virtualenv())
and old_v.parts[1:3] == ("usr", "local")
and len(new_v.parts) > 1
and new_v.parts[1] == "usr"
and (len(new_v.parts) < 3 or new_v.parts[2] != "local")
and (_looks_like_red_hat_scheme() or _looks_like_debian_scheme())
)
if skip_linux_system_special_case:
continue
# On Python 3.7 and earlier, sysconfig does not include sys.abiflags in
# the "pythonX.Y" part of the path, but distutils does.
skip_sysconfig_abiflag_bug = (
sys.version_info < (3, 8)
and not WINDOWS
and k in ("headers", "platlib", "purelib")
and tuple(_fix_abiflags(old_v.parts)) == new_v.parts
)
if skip_sysconfig_abiflag_bug:
continue
# MSYS2 MINGW's sysconfig patch does not include the "site-packages"
# part of the path. This is incorrect and will be fixed in MSYS.
skip_msys2_mingw_bug = (
WINDOWS and k in ("platlib", "purelib") and _looks_like_msys2_mingw_scheme()
)
if skip_msys2_mingw_bug:
continue
# CPython's POSIX install script invokes pip (via ensurepip) against the
# interpreter located in the source tree, not the install site. This
# triggers special logic in sysconfig that's not present in distutils.
# https://github.com/python/cpython/blob/8c21941ddaf/Lib/sysconfig.py#L178-L194
skip_cpython_build = (
sysconfig.is_python_build(check_home=True)
and not WINDOWS
and k in ("headers", "include", "platinclude")
)
if skip_cpython_build:
continue
warning_contexts.append((old_v, new_v, f"scheme.{k}"))
if not warning_contexts:
return old
# Check if this path mismatch is caused by distutils config files. Those
# files will no longer work once we switch to sysconfig, so this raises a
# deprecation message for them.
default_old = _distutils.distutils_scheme(
dist_name,
user,
home,
root,
isolated,
prefix,
ignore_config_files=True,
)
if any(default_old[k] != getattr(old, k) for k in SCHEME_KEYS):
deprecated(
reason=(
"Configuring installation scheme with distutils config files "
"is deprecated and will no longer work in the near future. If you "
"are using a Homebrew or Linuxbrew Python, please see discussion "
"at https://github.com/Homebrew/homebrew-core/issues/76621"
),
replacement=None,
gone_in=None,
)
return old
# Post warnings about this mismatch so user can report them back.
for old_v, new_v, key in warning_contexts:
_warn_mismatched(old_v, new_v, key=key)
_log_context(user=user, home=home, root=root, prefix=prefix)
return old
def get_bin_prefix() -> str:
new = _sysconfig.get_bin_prefix()
if _USE_SYSCONFIG:
return new
old = _distutils.get_bin_prefix()
if _warn_if_mismatch(pathlib.Path(old), pathlib.Path(new), key="bin_prefix"):
_log_context()
return old
def get_bin_user() -> str:
return _sysconfig.get_scheme("", user=True).scripts
def _looks_like_deb_system_dist_packages(value: str) -> bool:
"""Check if the value is Debian's APT-controlled dist-packages.
Debian's ``distutils.sysconfig.get_python_lib()`` implementation returns the
default package path controlled by APT, but does not patch ``sysconfig`` to
do the same. This is similar to the bug worked around in ``get_scheme()``,
but here the default is ``deb_system`` instead of ``unix_local``. Ultimately
we can't do anything about this Debian bug, and this detection allows us to
skip the warning when needed.
"""
if not _looks_like_debian_scheme():
return False
if value == "/usr/lib/python3/dist-packages":
return True
return False
def get_purelib() -> str:
"""Return the default pure-Python lib location."""
new = _sysconfig.get_purelib()
if _USE_SYSCONFIG:
return new
old = _distutils.get_purelib()
if _looks_like_deb_system_dist_packages(old):
return old
if _warn_if_mismatch(pathlib.Path(old), pathlib.Path(new), key="purelib"):
_log_context()
return old
def get_platlib() -> str:
"""Return the default platform-shared lib location."""
new = _sysconfig.get_platlib()
if _USE_SYSCONFIG:
return new
from . import _distutils
old = _distutils.get_platlib()
if _looks_like_deb_system_dist_packages(old):
return old
if _warn_if_mismatch(pathlib.Path(old), pathlib.Path(new), key="platlib"):
_log_context()
return old
"""Locations where we look for configs, install stuff, etc"""
# The following comment should be removed at some point in the future.
# mypy: strict-optional=False
# If pip's going to use distutils, it should not be using the copy that setuptools
# might have injected into the environment. This is done by removing the injected
# shim, if it's injected.
#
# See https://github.com/pypa/pip/issues/8761 for the original discussion and
# rationale for why this is done within pip.
try:
__import__("_distutils_hack").remove_shim()
except (ImportError, AttributeError):
pass
import logging
import os
import sys
from distutils.cmd import Command as DistutilsCommand
from distutils.command.install import SCHEME_KEYS
from distutils.command.install import install as distutils_install_command
from distutils.sysconfig import get_python_lib
from typing import Dict, List, Optional, Union, cast
from pip._internal.models.scheme import Scheme
from pip._internal.utils.compat import WINDOWS
from pip._internal.utils.virtualenv import running_under_virtualenv
from .base import get_major_minor_version
logger = logging.getLogger(__name__)
def distutils_scheme(
dist_name: str,
user: bool = False,
home: Optional[str] = None,
root: Optional[str] = None,
isolated: bool = False,
prefix: Optional[str] = None,
*,
ignore_config_files: bool = False,
) -> Dict[str, str]:
"""
Return a distutils install scheme
"""
from distutils.dist import Distribution
dist_args: Dict[str, Union[str, List[str]]] = {"name": dist_name}
if isolated:
dist_args["script_args"] = ["--no-user-cfg"]
d = Distribution(dist_args)
if not ignore_config_files:
try:
d.parse_config_files()
except UnicodeDecodeError:
# Typeshed does not include find_config_files() for some reason.
paths = d.find_config_files() # type: ignore
logger.warning(
"Ignore distutils configs in %s due to encoding errors.",
", ".join(os.path.basename(p) for p in paths),
)
obj: Optional[DistutilsCommand] = None
obj = d.get_command_obj("install", create=True)
assert obj is not None
i = cast(distutils_install_command, obj)
# NOTE: setting user or home has the side-effect of creating the home dir
# or user base for installations during finalize_options()
# ideally, we'd prefer a scheme class that has no side-effects.
assert not (user and prefix), f"user={user} prefix={prefix}"
assert not (home and prefix), f"home={home} prefix={prefix}"
i.user = user or i.user
if user or home:
i.prefix = ""
i.prefix = prefix or i.prefix
i.home = home or i.home
i.root = root or i.root
i.finalize_options()
scheme = {}
for key in SCHEME_KEYS:
scheme[key] = getattr(i, "install_" + key)
# install_lib specified in setup.cfg should install *everything*
# into there (i.e. it takes precedence over both purelib and
# platlib). Note, i.install_lib is *always* set after
# finalize_options(); we only want to override here if the user
# has explicitly requested it hence going back to the config
if "install_lib" in d.get_option_dict("install"):
scheme.update(dict(purelib=i.install_lib, platlib=i.install_lib))
if running_under_virtualenv():
if home:
prefix = home
elif user:
prefix = i.install_userbase
else:
prefix = i.prefix
scheme["headers"] = os.path.join(
prefix,
"include",
"site",
f"python{get_major_minor_version()}",
dist_name,
)
if root is not None:
path_no_drive = os.path.splitdrive(os.path.abspath(scheme["headers"]))[1]
scheme["headers"] = os.path.join(root, path_no_drive[1:])
return scheme
def get_scheme(
dist_name: str,
user: bool = False,
home: Optional[str] = None,
root: Optional[str] = None,
isolated: bool = False,
prefix: Optional[str] = None,
) -> Scheme:
"""
Get the "scheme" corresponding to the input parameters. The distutils
documentation provides the context for the available schemes:
https://docs.python.org/3/install/index.html#alternate-installation
:param dist_name: the name of the package to retrieve the scheme for, used
in the headers scheme path
:param user: indicates to use the "user" scheme
:param home: indicates to use the "home" scheme and provides the base
directory for the same
:param root: root under which other directories are re-based
:param isolated: equivalent to --no-user-cfg, i.e. do not consider
~/.pydistutils.cfg (posix) or ~/pydistutils.cfg (non-posix) for
scheme paths
:param prefix: indicates to use the "prefix" scheme and provides the
base directory for the same
"""
scheme = distutils_scheme(dist_name, user, home, root, isolated, prefix)
return Scheme(
platlib=scheme["platlib"],
purelib=scheme["purelib"],
headers=scheme["headers"],
scripts=scheme["scripts"],
data=scheme["data"],
)
def get_bin_prefix() -> str:
# XXX: In old virtualenv versions, sys.prefix can contain '..' components,
# so we need to call normpath to eliminate them.
prefix = os.path.normpath(sys.prefix)
if WINDOWS:
bin_py = os.path.join(prefix, "Scripts")
# buildout uses 'bin' on Windows too?
if not os.path.exists(bin_py):
bin_py = os.path.join(prefix, "bin")
return bin_py
# Forcing to use /usr/local/bin for standard macOS framework installs
# Also log to ~/Library/Logs/ for use with the Console.app log viewer
if sys.platform[:6] == "darwin" and prefix[:16] == "/System/Library/":
return "/usr/local/bin"
return os.path.join(prefix, "bin")
def get_purelib() -> str:
return get_python_lib(plat_specific=False)
def get_platlib() -> str:
return get_python_lib(plat_specific=True)
import logging
import os
import sys
import sysconfig
import typing
from pip._internal.exceptions import InvalidSchemeCombination, UserInstallationInvalid
from pip._internal.models.scheme import SCHEME_KEYS, Scheme
from pip._internal.utils.virtualenv import running_under_virtualenv
from .base import change_root, get_major_minor_version, is_osx_framework
logger = logging.getLogger(__name__)
# Notes on _infer_* functions.
# Unfortunately ``get_default_scheme()`` didn't exist before 3.10, so there's no
# way to ask things like "what is the '_prefix' scheme on this platform". These
# functions try to answer that with some heuristics while accounting for ad-hoc
# platforms not covered by CPython's default sysconfig implementation. If the
# ad-hoc implementation does not fully implement sysconfig, we'll fall back to
# a POSIX scheme.
_AVAILABLE_SCHEMES = set(sysconfig.get_scheme_names())
_PREFERRED_SCHEME_API = getattr(sysconfig, "get_preferred_scheme", None)
def _should_use_osx_framework_prefix() -> bool:
"""Check for Apple's ``osx_framework_library`` scheme.
Python distributed by Apple's Command Line Tools has this special scheme
that's used when:
* This is a framework build.
* We are installing into the system prefix.
This does not account for ``pip install --prefix`` (also means we're not
installing to the system prefix), which should use ``posix_prefix``, but
logic here means ``_infer_prefix()`` outputs ``osx_framework_library``. But
since ``prefix`` is not available for ``sysconfig.get_default_scheme()``,
which is the stdlib replacement for ``_infer_prefix()``, presumably Apple
wouldn't be able to magically switch between ``osx_framework_library`` and
``posix_prefix``. ``_infer_prefix()`` returning ``osx_framework_library``
means its behavior is consistent whether we use the stdlib implementation
or our own, and we deal with this special case in ``get_scheme()`` instead.
"""
return (
"osx_framework_library" in _AVAILABLE_SCHEMES
and not running_under_virtualenv()
and is_osx_framework()
)
def _infer_prefix() -> str:
"""Try to find a prefix scheme for the current platform.
This tries:
* A special ``osx_framework_library`` for Python distributed by Apple's
Command Line Tools, when not running in a virtual environment.
* Implementation + OS, used by PyPy on Windows (``pypy_nt``).
* Implementation without OS, used by PyPy on POSIX (``pypy``).
* OS + "prefix", used by CPython on POSIX (``posix_prefix``).
* Just the OS name, used by CPython on Windows (``nt``).
If none of the above works, fall back to ``posix_prefix``.
"""
if _PREFERRED_SCHEME_API:
return _PREFERRED_SCHEME_API("prefix")
if _should_use_osx_framework_prefix():
return "osx_framework_library"
implementation_suffixed = f"{sys.implementation.name}_{os.name}"
if implementation_suffixed in _AVAILABLE_SCHEMES:
return implementation_suffixed
if sys.implementation.name in _AVAILABLE_SCHEMES:
return sys.implementation.name
suffixed = f"{os.name}_prefix"
if suffixed in _AVAILABLE_SCHEMES:
return suffixed
if os.name in _AVAILABLE_SCHEMES: # On Windows, prefx is just called "nt".
return os.name
return "posix_prefix"
def _infer_user() -> str:
"""Try to find a user scheme for the current platform."""
if _PREFERRED_SCHEME_API:
return _PREFERRED_SCHEME_API("user")
if is_osx_framework() and not running_under_virtualenv():
suffixed = "osx_framework_user"
else:
suffixed = f"{os.name}_user"
if suffixed in _AVAILABLE_SCHEMES:
return suffixed
if "posix_user" not in _AVAILABLE_SCHEMES: # User scheme unavailable.
raise UserInstallationInvalid()
return "posix_user"
def _infer_home() -> str:
"""Try to find a home for the current platform."""
if _PREFERRED_SCHEME_API:
return _PREFERRED_SCHEME_API("home")
suffixed = f"{os.name}_home"
if suffixed in _AVAILABLE_SCHEMES:
return suffixed
return "posix_home"
# Update these keys if the user sets a custom home.
_HOME_KEYS = [
"installed_base",
"base",
"installed_platbase",
"platbase",
"prefix",
"exec_prefix",
]
if sysconfig.get_config_var("userbase") is not None:
_HOME_KEYS.append("userbase")
def get_scheme(
dist_name: str,
user: bool = False,
home: typing.Optional[str] = None,
root: typing.Optional[str] = None,
isolated: bool = False,
prefix: typing.Optional[str] = None,
) -> Scheme:
"""
Get the "scheme" corresponding to the input parameters.
:param dist_name: the name of the package to retrieve the scheme for, used
in the headers scheme path
:param user: indicates to use the "user" scheme
:param home: indicates to use the "home" scheme
:param root: root under which other directories are re-based
:param isolated: ignored, but kept for distutils compatibility (where
this controls whether the user-site pydistutils.cfg is honored)
:param prefix: indicates to use the "prefix" scheme and provides the
base directory for the same
"""
if user and prefix:
raise InvalidSchemeCombination("--user", "--prefix")
if home and prefix:
raise InvalidSchemeCombination("--home", "--prefix")
if home is not None:
scheme_name = _infer_home()
elif user:
scheme_name = _infer_user()
else:
scheme_name = _infer_prefix()
# Special case: When installing into a custom prefix, use posix_prefix
# instead of osx_framework_library. See _should_use_osx_framework_prefix()
# docstring for details.
if prefix is not None and scheme_name == "osx_framework_library":
scheme_name = "posix_prefix"
if home is not None:
variables = {k: home for k in _HOME_KEYS}
elif prefix is not None:
variables = {k: prefix for k in _HOME_KEYS}
else:
variables = {}
paths = sysconfig.get_paths(scheme=scheme_name, vars=variables)
# Logic here is very arbitrary, we're doing it for compatibility, don't ask.
# 1. Pip historically uses a special header path in virtual environments.
# 2. If the distribution name is not known, distutils uses 'UNKNOWN'. We
# only do the same when not running in a virtual environment because
# pip's historical header path logic (see point 1) did not do this.
if running_under_virtualenv():
if user:
base = variables.get("userbase", sys.prefix)
else:
base = variables.get("base", sys.prefix)
python_xy = f"python{get_major_minor_version()}"
paths["include"] = os.path.join(base, "include", "site", python_xy)
elif not dist_name:
dist_name = "UNKNOWN"
scheme = Scheme(
platlib=paths["platlib"],
purelib=paths["purelib"],
headers=os.path.join(paths["include"], dist_name),
scripts=paths["scripts"],
data=paths["data"],
)
if root is not None:
for key in SCHEME_KEYS:
value = change_root(root, getattr(scheme, key))
setattr(scheme, key, value)
return scheme
def get_bin_prefix() -> str:
# Forcing to use /usr/local/bin for standard macOS framework installs.
if sys.platform[:6] == "darwin" and sys.prefix[:16] == "/System/Library/":
return "/usr/local/bin"
return sysconfig.get_paths()["scripts"]
def get_purelib() -> str:
return sysconfig.get_paths()["purelib"]
def get_platlib() -> str:
return sysconfig.get_paths()["platlib"]
import functools
import os
import site
import sys
import sysconfig
import typing
from pip._internal.exceptions import InstallationError
from pip._internal.utils import appdirs
from pip._internal.utils.virtualenv import running_under_virtualenv
# Application Directories
USER_CACHE_DIR = appdirs.user_cache_dir("pip")
# FIXME doesn't account for venv linked to global site-packages
site_packages: str = sysconfig.get_path("purelib")
def get_major_minor_version() -> str:
"""
Return the major-minor version of the current Python as a string, e.g.
"3.7" or "3.10".
"""
return "{}.{}".format(*sys.version_info)
def change_root(new_root: str, pathname: str) -> str:
"""Return 'pathname' with 'new_root' prepended.
If 'pathname' is relative, this is equivalent to os.path.join(new_root, pathname).
Otherwise, it requires making 'pathname' relative and then joining the
two, which is tricky on DOS/Windows and Mac OS.
This is borrowed from Python's standard library's distutils module.
"""
if os.name == "posix":
if not os.path.isabs(pathname):
return os.path.join(new_root, pathname)
else:
return os.path.join(new_root, pathname[1:])
elif os.name == "nt":
(drive, path) = os.path.splitdrive(pathname)
if path[0] == "\\":
path = path[1:]
return os.path.join(new_root, path)
else:
raise InstallationError(
f"Unknown platform: {os.name}\n"
"Can not change root path prefix on unknown platform."
)
def get_src_prefix() -> str:
if running_under_virtualenv():
src_prefix = os.path.join(sys.prefix, "src")
else:
# FIXME: keep src in cwd for now (it is not a temporary folder)
try:
src_prefix = os.path.join(os.getcwd(), "src")
except OSError:
# In case the current working directory has been renamed or deleted
sys.exit("The folder you are executing pip from can no longer be found.")
# under macOS + virtualenv sys.prefix is not properly resolved
# it is something like /path/to/python/bin/..
return os.path.abspath(src_prefix)
try:
# Use getusersitepackages if this is present, as it ensures that the
# value is initialised properly.
user_site: typing.Optional[str] = site.getusersitepackages()
except AttributeError:
user_site = site.USER_SITE
@functools.lru_cache(maxsize=None)
def is_osx_framework() -> bool:
return bool(sysconfig.get_config_var("PYTHONFRAMEWORK"))