Skip to content
GitLab
Explore
Sign in
Hide whitespace changes
Inline
Side-by-side
Some changes are not shown.
For a faster browsing experience, only
8 of 348+
files are shown.
server-temp/venv/lib/python3.10/site-packages/pip/_internal/exceptions.py
deleted
100644 → 0
View file @
03bf2ef9
"""
Exceptions used throughout package.
This module MUST NOT try to import from anything within `pip._internal` to
operate. This is expected to be importable from any/all files within the
subpackage and, thus, should not depend on them.
"""
import
configparser
import
contextlib
import
locale
import
logging
import
pathlib
import
re
import
sys
from
itertools
import
chain
,
groupby
,
repeat
from
typing
import
TYPE_CHECKING
,
Dict
,
Iterator
,
List
,
Optional
,
Union
from
pip._vendor.requests.models
import
Request
,
Response
from
pip._vendor.rich.console
import
Console
,
ConsoleOptions
,
RenderResult
from
pip._vendor.rich.markup
import
escape
from
pip._vendor.rich.text
import
Text
if
TYPE_CHECKING
:
from
hashlib
import
_Hash
from
typing
import
Literal
from
pip._internal.metadata
import
BaseDistribution
from
pip._internal.req.req_install
import
InstallRequirement
logger
=
logging
.
getLogger
(
__name__
)
#
# Scaffolding
#
def
_is_kebab_case
(
s
:
str
)
->
bool
:
return
re
.
match
(
r
"
^[a-z]+(-[a-z]+)*$
"
,
s
)
is
not
None
def
_prefix_with_indent
(
s
:
Union
[
Text
,
str
],
console
:
Console
,
*
,
prefix
:
str
,
indent
:
str
,
)
->
Text
:
if
isinstance
(
s
,
Text
):
text
=
s
else
:
text
=
console
.
render_str
(
s
)
return
console
.
render_str
(
prefix
,
overflow
=
"
ignore
"
)
+
console
.
render_str
(
f
"
\n
{
indent
}
"
,
overflow
=
"
ignore
"
).
join
(
text
.
split
(
allow_blank
=
True
))
class
PipError
(
Exception
):
"""
The base pip error.
"""
class
DiagnosticPipError
(
PipError
):
"""
An error, that presents diagnostic information to the user.
This contains a bunch of logic, to enable pretty presentation of our error
messages. Each error gets a unique reference. Each error can also include
additional context, a hint and/or a note -- which are presented with the
main error message in a consistent style.
This is adapted from the error output styling in `sphinx-theme-builder`.
"""
reference
:
str
def
__init__
(
self
,
*
,
kind
:
'
Literal[
"
error
"
,
"
warning
"
]
'
=
"
error
"
,
reference
:
Optional
[
str
]
=
None
,
message
:
Union
[
str
,
Text
],
context
:
Optional
[
Union
[
str
,
Text
]],
hint_stmt
:
Optional
[
Union
[
str
,
Text
]],
note_stmt
:
Optional
[
Union
[
str
,
Text
]]
=
None
,
link
:
Optional
[
str
]
=
None
,
)
->
None
:
# Ensure a proper reference is provided.
if
reference
is
None
:
assert
hasattr
(
self
,
"
reference
"
),
"
error reference not provided!
"
reference
=
self
.
reference
assert
_is_kebab_case
(
reference
),
"
error reference must be kebab-case!
"
self
.
kind
=
kind
self
.
reference
=
reference
self
.
message
=
message
self
.
context
=
context
self
.
note_stmt
=
note_stmt
self
.
hint_stmt
=
hint_stmt
self
.
link
=
link
super
().
__init__
(
f
"
<
{
self
.
__class__
.
__name__
}
:
{
self
.
reference
}
>
"
)
def
__repr__
(
self
)
->
str
:
return
(
f
"
<
{
self
.
__class__
.
__name__
}
(
"
f
"
reference=
{
self
.
reference
!r}
,
"
f
"
message=
{
self
.
message
!r}
,
"
f
"
context=
{
self
.
context
!r}
,
"
f
"
note_stmt=
{
self
.
note_stmt
!r}
,
"
f
"
hint_stmt=
{
self
.
hint_stmt
!r}
"
"
)>
"
)
def
__rich_console__
(
self
,
console
:
Console
,
options
:
ConsoleOptions
,
)
->
RenderResult
:
colour
=
"
red
"
if
self
.
kind
==
"
error
"
else
"
yellow
"
yield
f
"
[
{
colour
}
bold]
{
self
.
kind
}
[/]: [bold]
{
self
.
reference
}
[/]
"
yield
""
if
not
options
.
ascii_only
:
# Present the main message, with relevant context indented.
if
self
.
context
is
not
None
:
yield
_prefix_with_indent
(
self
.
message
,
console
,
prefix
=
f
"
[
{
colour
}
]×[/]
"
,
indent
=
f
"
[
{
colour
}
]│[/]
"
,
)
yield
_prefix_with_indent
(
self
.
context
,
console
,
prefix
=
f
"
[
{
colour
}
]╰─>[/]
"
,
indent
=
f
"
[
{
colour
}
] [/]
"
,
)
else
:
yield
_prefix_with_indent
(
self
.
message
,
console
,
prefix
=
"
[red]×[/]
"
,
indent
=
"
"
,
)
else
:
yield
self
.
message
if
self
.
context
is
not
None
:
yield
""
yield
self
.
context
if
self
.
note_stmt
is
not
None
or
self
.
hint_stmt
is
not
None
:
yield
""
if
self
.
note_stmt
is
not
None
:
yield
_prefix_with_indent
(
self
.
note_stmt
,
console
,
prefix
=
"
[magenta bold]note[/]:
"
,
indent
=
"
"
,
)
if
self
.
hint_stmt
is
not
None
:
yield
_prefix_with_indent
(
self
.
hint_stmt
,
console
,
prefix
=
"
[cyan bold]hint[/]:
"
,
indent
=
"
"
,
)
if
self
.
link
is
not
None
:
yield
""
yield
f
"
Link:
{
self
.
link
}
"
#
# Actual Errors
#
class
ConfigurationError
(
PipError
):
"""
General exception in configuration
"""
class
InstallationError
(
PipError
):
"""
General exception during installation
"""
class
UninstallationError
(
PipError
):
"""
General exception during uninstallation
"""
class
MissingPyProjectBuildRequires
(
DiagnosticPipError
):
"""
Raised when pyproject.toml has `build-system`, but no `build-system.requires`.
"""
reference
=
"
missing-pyproject-build-system-requires
"
def
__init__
(
self
,
*
,
package
:
str
)
->
None
:
super
().
__init__
(
message
=
f
"
Can not process
{
escape
(
package
)
}
"
,
context
=
Text
(
"
This package has an invalid pyproject.toml file.
\n
"
"
The [build-system] table is missing the mandatory `requires` key.
"
),
note_stmt
=
"
This is an issue with the package mentioned above, not pip.
"
,
hint_stmt
=
Text
(
"
See PEP 518 for the detailed specification.
"
),
)
class
InvalidPyProjectBuildRequires
(
DiagnosticPipError
):
"""
Raised when pyproject.toml an invalid `build-system.requires`.
"""
reference
=
"
invalid-pyproject-build-system-requires
"
def
__init__
(
self
,
*
,
package
:
str
,
reason
:
str
)
->
None
:
super
().
__init__
(
message
=
f
"
Can not process
{
escape
(
package
)
}
"
,
context
=
Text
(
"
This package has an invalid `build-system.requires` key in
"
f
"
pyproject.toml.
\n
{
reason
}
"
),
note_stmt
=
"
This is an issue with the package mentioned above, not pip.
"
,
hint_stmt
=
Text
(
"
See PEP 518 for the detailed specification.
"
),
)
class
NoneMetadataError
(
PipError
):
"""
Raised when accessing a Distribution
'
s
"
METADATA
"
or
"
PKG-INFO
"
.
This signifies an inconsistency, when the Distribution claims to have
the metadata file (if not, raise ``FileNotFoundError`` instead), but is
not actually able to produce its content. This may be due to permission
errors.
"""
def
__init__
(
self
,
dist
:
"
BaseDistribution
"
,
metadata_name
:
str
,
)
->
None
:
"""
:param dist: A Distribution object.
:param metadata_name: The name of the metadata being accessed
(can be
"
METADATA
"
or
"
PKG-INFO
"
).
"""
self
.
dist
=
dist
self
.
metadata_name
=
metadata_name
def
__str__
(
self
)
->
str
:
# Use `dist` in the error message because its stringification
# includes more information, like the version and location.
return
"
None {} metadata found for distribution: {}
"
.
format
(
self
.
metadata_name
,
self
.
dist
,
)
class
UserInstallationInvalid
(
InstallationError
):
"""
A --user install is requested on an environment without user site.
"""
def
__str__
(
self
)
->
str
:
return
"
User base directory is not specified
"
class
InvalidSchemeCombination
(
InstallationError
):
def
__str__
(
self
)
->
str
:
before
=
"
,
"
.
join
(
str
(
a
)
for
a
in
self
.
args
[:
-
1
])
return
f
"
Cannot set
{
before
}
and
{
self
.
args
[
-
1
]
}
together
"
class
DistributionNotFound
(
InstallationError
):
"""
Raised when a distribution cannot be found to satisfy a requirement
"""
class
RequirementsFileParseError
(
InstallationError
):
"""
Raised when a general error occurs parsing a requirements file line.
"""
class
BestVersionAlreadyInstalled
(
PipError
):
"""
Raised when the most up-to-date version of a package is already
installed.
"""
class
BadCommand
(
PipError
):
"""
Raised when virtualenv or a command is not found
"""
class
CommandError
(
PipError
):
"""
Raised when there is an error in command-line arguments
"""
class
PreviousBuildDirError
(
PipError
):
"""
Raised when there
'
s a previous conflicting build directory
"""
class
NetworkConnectionError
(
PipError
):
"""
HTTP connection error
"""
def
__init__
(
self
,
error_msg
:
str
,
response
:
Optional
[
Response
]
=
None
,
request
:
Optional
[
Request
]
=
None
,
)
->
None
:
"""
Initialize NetworkConnectionError with `request` and `response`
objects.
"""
self
.
response
=
response
self
.
request
=
request
self
.
error_msg
=
error_msg
if
(
self
.
response
is
not
None
and
not
self
.
request
and
hasattr
(
response
,
"
request
"
)
):
self
.
request
=
self
.
response
.
request
super
().
__init__
(
error_msg
,
response
,
request
)
def
__str__
(
self
)
->
str
:
return
str
(
self
.
error_msg
)
class
InvalidWheelFilename
(
InstallationError
):
"""
Invalid wheel filename.
"""
class
UnsupportedWheel
(
InstallationError
):
"""
Unsupported wheel.
"""
class
InvalidWheel
(
InstallationError
):
"""
Invalid (e.g. corrupt) wheel.
"""
def
__init__
(
self
,
location
:
str
,
name
:
str
):
self
.
location
=
location
self
.
name
=
name
def
__str__
(
self
)
->
str
:
return
f
"
Wheel
'
{
self
.
name
}
'
located at
{
self
.
location
}
is invalid.
"
class
MetadataInconsistent
(
InstallationError
):
"""
Built metadata contains inconsistent information.
This is raised when the metadata contains values (e.g. name and version)
that do not match the information previously obtained from sdist filename,
user-supplied ``#egg=`` value, or an install requirement name.
"""
def
__init__
(
self
,
ireq
:
"
InstallRequirement
"
,
field
:
str
,
f_val
:
str
,
m_val
:
str
)
->
None
:
self
.
ireq
=
ireq
self
.
field
=
field
self
.
f_val
=
f_val
self
.
m_val
=
m_val
def
__str__
(
self
)
->
str
:
return
(
f
"
Requested
{
self
.
ireq
}
has inconsistent
{
self
.
field
}
:
"
f
"
expected
{
self
.
f_val
!r}
, but metadata has
{
self
.
m_val
!r}
"
)
class
LegacyInstallFailure
(
DiagnosticPipError
):
"""
Error occurred while executing `setup.py install`
"""
reference
=
"
legacy-install-failure
"
def
__init__
(
self
,
package_details
:
str
)
->
None
:
super
().
__init__
(
message
=
"
Encountered error while trying to install package.
"
,
context
=
package_details
,
hint_stmt
=
"
See above for output from the failure.
"
,
note_stmt
=
"
This is an issue with the package mentioned above, not pip.
"
,
)
class
InstallationSubprocessError
(
DiagnosticPipError
,
InstallationError
):
"""
A subprocess call failed.
"""
reference
=
"
subprocess-exited-with-error
"
def
__init__
(
self
,
*
,
command_description
:
str
,
exit_code
:
int
,
output_lines
:
Optional
[
List
[
str
]],
)
->
None
:
if
output_lines
is
None
:
output_prompt
=
Text
(
"
See above for output.
"
)
else
:
output_prompt
=
(
Text
.
from_markup
(
f
"
[red][
{
len
(
output_lines
)
}
lines of output][/]
\n
"
)
+
Text
(
""
.
join
(
output_lines
))
+
Text
.
from_markup
(
R
"
[red]\[end of output][/]
"
)
)
super
().
__init__
(
message
=
(
f
"
[green]
{
escape
(
command_description
)
}
[/] did not run successfully.
\n
"
f
"
exit code:
{
exit_code
}
"
),
context
=
output_prompt
,
hint_stmt
=
None
,
note_stmt
=
(
"
This error originates from a subprocess, and is likely not a
"
"
problem with pip.
"
),
)
self
.
command_description
=
command_description
self
.
exit_code
=
exit_code
def
__str__
(
self
)
->
str
:
return
f
"
{
self
.
command_description
}
exited with
{
self
.
exit_code
}
"
class
MetadataGenerationFailed
(
InstallationSubprocessError
,
InstallationError
):
reference
=
"
metadata-generation-failed
"
def
__init__
(
self
,
*
,
package_details
:
str
,
)
->
None
:
super
(
InstallationSubprocessError
,
self
).
__init__
(
message
=
"
Encountered error while generating package metadata.
"
,
context
=
escape
(
package_details
),
hint_stmt
=
"
See above for details.
"
,
note_stmt
=
"
This is an issue with the package mentioned above, not pip.
"
,
)
def
__str__
(
self
)
->
str
:
return
"
metadata generation failed
"
class
HashErrors
(
InstallationError
):
"""
Multiple HashError instances rolled into one for reporting
"""
def
__init__
(
self
)
->
None
:
self
.
errors
:
List
[
"
HashError
"
]
=
[]
def
append
(
self
,
error
:
"
HashError
"
)
->
None
:
self
.
errors
.
append
(
error
)
def
__str__
(
self
)
->
str
:
lines
=
[]
self
.
errors
.
sort
(
key
=
lambda
e
:
e
.
order
)
for
cls
,
errors_of_cls
in
groupby
(
self
.
errors
,
lambda
e
:
e
.
__class__
):
lines
.
append
(
cls
.
head
)
lines
.
extend
(
e
.
body
()
for
e
in
errors_of_cls
)
if
lines
:
return
"
\n
"
.
join
(
lines
)
return
""
def
__bool__
(
self
)
->
bool
:
return
bool
(
self
.
errors
)
class
HashError
(
InstallationError
):
"""
A failure to verify a package against known-good hashes
:cvar order: An int sorting hash exception classes by difficulty of
recovery (lower being harder), so the user doesn
'
t bother fretting
about unpinned packages when he has deeper issues, like VCS
dependencies, to deal with. Also keeps error reports in a
deterministic order.
:cvar head: A section heading for display above potentially many
exceptions of this kind
:ivar req: The InstallRequirement that triggered this error. This is
pasted on after the exception is instantiated, because it
'
s not
typically available earlier.
"""
req
:
Optional
[
"
InstallRequirement
"
]
=
None
head
=
""
order
:
int
=
-
1
def
body
(
self
)
->
str
:
"""
Return a summary of me for display under the heading.
This default implementation simply prints a description of the
triggering requirement.
:param req: The InstallRequirement that provoked this error, with
its link already populated by the resolver
'
s _populate_link().
"""
return
f
"
{
self
.
_requirement_name
()
}
"
def
__str__
(
self
)
->
str
:
return
f
"
{
self
.
head
}
\n
{
self
.
body
()
}
"
def
_requirement_name
(
self
)
->
str
:
"""
Return a description of the requirement that triggered me.
This default implementation returns long description of the req, with
line numbers
"""
return
str
(
self
.
req
)
if
self
.
req
else
"
unknown package
"
class
VcsHashUnsupported
(
HashError
):
"""
A hash was provided for a version-control-system-based requirement, but
we don
'
t have a method for hashing those.
"""
order
=
0
head
=
(
"
Can
'
t verify hashes for these requirements because we don
'
t
"
"
have a way to hash version control repositories:
"
)
class
DirectoryUrlHashUnsupported
(
HashError
):
"""
A hash was provided for a version-control-system-based requirement, but
we don
'
t have a method for hashing those.
"""
order
=
1
head
=
(
"
Can
'
t verify hashes for these file:// requirements because they
"
"
point to directories:
"
)
class
HashMissing
(
HashError
):
"""
A hash was needed for a requirement but is absent.
"""
order
=
2
head
=
(
"
Hashes are required in --require-hashes mode, but they are
"
"
missing from some requirements. Here is a list of those
"
"
requirements along with the hashes their downloaded archives
"
"
actually had. Add lines like these to your requirements files to
"
"
prevent tampering. (If you did not enable --require-hashes
"
"
manually, note that it turns on automatically when any package
"
"
has a hash.)
"
)
def
__init__
(
self
,
gotten_hash
:
str
)
->
None
:
"""
:param gotten_hash: The hash of the (possibly malicious) archive we
just downloaded
"""
self
.
gotten_hash
=
gotten_hash
def
body
(
self
)
->
str
:
# Dodge circular import.
from
pip._internal.utils.hashes
import
FAVORITE_HASH
package
=
None
if
self
.
req
:
# In the case of URL-based requirements, display the original URL
# seen in the requirements file rather than the package name,
# so the output can be directly copied into the requirements file.
package
=
(
self
.
req
.
original_link
if
self
.
req
.
original_link
# In case someone feeds something downright stupid
# to InstallRequirement's constructor.
else
getattr
(
self
.
req
,
"
req
"
,
None
)
)
return
"
{} --hash={}:{}
"
.
format
(
package
or
"
unknown package
"
,
FAVORITE_HASH
,
self
.
gotten_hash
)
class
HashUnpinned
(
HashError
):
"""
A requirement had a hash specified but was not pinned to a specific
version.
"""
order
=
3
head
=
(
"
In --require-hashes mode, all requirements must have their
"
"
versions pinned with ==. These do not:
"
)
class
HashMismatch
(
HashError
):
"""
Distribution file hash values don
'
t match.
:ivar package_name: The name of the package that triggered the hash
mismatch. Feel free to write to this after the exception is raise to
improve its error message.
"""
order
=
4
head
=
(
"
THESE PACKAGES DO NOT MATCH THE HASHES FROM THE REQUIREMENTS
"
"
FILE. If you have updated the package versions, please update
"
"
the hashes. Otherwise, examine the package contents carefully;
"
"
someone may have tampered with them.
"
)
def
__init__
(
self
,
allowed
:
Dict
[
str
,
List
[
str
]],
gots
:
Dict
[
str
,
"
_Hash
"
])
->
None
:
"""
:param allowed: A dict of algorithm names pointing to lists of allowed
hex digests
:param gots: A dict of algorithm names pointing to hashes we
actually got from the files under suspicion
"""
self
.
allowed
=
allowed
self
.
gots
=
gots
def
body
(
self
)
->
str
:
return
"
{}:
\n
{}
"
.
format
(
self
.
_requirement_name
(),
self
.
_hash_comparison
())
def
_hash_comparison
(
self
)
->
str
:
"""
Return a comparison of actual and expected hash values.
Example::
Expected sha256 abcdeabcdeabcdeabcdeabcdeabcdeabcdeabcdeabcde
or 123451234512345123451234512345123451234512345
Got bcdefbcdefbcdefbcdefbcdefbcdefbcdefbcdefbcdef
"""
def
hash_then_or
(
hash_name
:
str
)
->
"
chain[str]
"
:
# For now, all the decent hashes have 6-char names, so we can get
# away with hard-coding space literals.
return
chain
([
hash_name
],
repeat
(
"
or
"
))
lines
:
List
[
str
]
=
[]
for
hash_name
,
expecteds
in
self
.
allowed
.
items
():
prefix
=
hash_then_or
(
hash_name
)
lines
.
extend
(
(
"
Expected {} {}
"
.
format
(
next
(
prefix
),
e
))
for
e
in
expecteds
)
lines
.
append
(
"
Got {}
\n
"
.
format
(
self
.
gots
[
hash_name
].
hexdigest
())
)
return
"
\n
"
.
join
(
lines
)
class
UnsupportedPythonVersion
(
InstallationError
):
"""
Unsupported python version according to Requires-Python package
metadata.
"""
class
ConfigurationFileCouldNotBeLoaded
(
ConfigurationError
):
"""
When there are errors while loading a configuration file
"""
def
__init__
(
self
,
reason
:
str
=
"
could not be loaded
"
,
fname
:
Optional
[
str
]
=
None
,
error
:
Optional
[
configparser
.
Error
]
=
None
,
)
->
None
:
super
().
__init__
(
error
)
self
.
reason
=
reason
self
.
fname
=
fname
self
.
error
=
error
def
__str__
(
self
)
->
str
:
if
self
.
fname
is
not
None
:
message_part
=
f
"
in
{
self
.
fname
}
.
"
else
:
assert
self
.
error
is
not
None
message_part
=
f
"
.
\n
{
self
.
error
}
\n
"
return
f
"
Configuration file
{
self
.
reason
}{
message_part
}
"
_DEFAULT_EXTERNALLY_MANAGED_ERROR
=
f
"""
\
The Python environment under
{
sys
.
prefix
}
is managed externally, and may not be
manipulated by the user. Please use specific tooling from the distributor of
the Python installation to interact with this environment instead.
"""
class
ExternallyManagedEnvironment
(
DiagnosticPipError
):
"""
The current environment is externally managed.
This is raised when the current environment is externally managed, as
defined by `PEP 668`_. The ``EXTERNALLY-MANAGED`` configuration is checked
and displayed when the error is bubbled up to the user.
:param error: The error message read from ``EXTERNALLY-MANAGED``.
"""
reference
=
"
externally-managed-environment
"
def
__init__
(
self
,
error
:
Optional
[
str
])
->
None
:
if
error
is
None
:
context
=
Text
(
_DEFAULT_EXTERNALLY_MANAGED_ERROR
)
else
:
context
=
Text
(
error
)
super
().
__init__
(
message
=
"
This environment is externally managed
"
,
context
=
context
,
note_stmt
=
(
"
If you believe this is a mistake, please contact your
"
"
Python installation or OS distribution provider.
"
"
You can override this, at the risk of breaking your Python
"
"
installation or OS, by passing --break-system-packages.
"
),
hint_stmt
=
Text
(
"
See PEP 668 for the detailed specification.
"
),
)
@staticmethod
def
_iter_externally_managed_error_keys
()
->
Iterator
[
str
]:
# LC_MESSAGES is in POSIX, but not the C standard. The most common
# platform that does not implement this category is Windows, where
# using other categories for console message localization is equally
# unreliable, so we fall back to the locale-less vendor message. This
# can always be re-evaluated when a vendor proposes a new alternative.
try
:
category
=
locale
.
LC_MESSAGES
except
AttributeError
:
lang
:
Optional
[
str
]
=
None
else
:
lang
,
_
=
locale
.
getlocale
(
category
)
if
lang
is
not
None
:
yield
f
"
Error-
{
lang
}
"
for
sep
in
(
"
-
"
,
"
_
"
):
before
,
found
,
_
=
lang
.
partition
(
sep
)
if
not
found
:
continue
yield
f
"
Error-
{
before
}
"
yield
"
Error
"
@classmethod
def
from_config
(
cls
,
config
:
Union
[
pathlib
.
Path
,
str
],
)
->
"
ExternallyManagedEnvironment
"
:
parser
=
configparser
.
ConfigParser
(
interpolation
=
None
)
try
:
parser
.
read
(
config
,
encoding
=
"
utf-8
"
)
section
=
parser
[
"
externally-managed
"
]
for
key
in
cls
.
_iter_externally_managed_error_keys
():
with
contextlib
.
suppress
(
KeyError
):
return
cls
(
section
[
key
])
except
KeyError
:
pass
except
(
OSError
,
UnicodeDecodeError
,
configparser
.
ParsingError
):
from
pip._internal.utils._log
import
VERBOSE
exc_info
=
logger
.
isEnabledFor
(
VERBOSE
)
logger
.
warning
(
"
Failed to read %s
"
,
config
,
exc_info
=
exc_info
)
return
cls
(
None
)
server-temp/venv/lib/python3.10/site-packages/pip/_internal/index/__init__.py
deleted
100644 → 0
View file @
03bf2ef9
"""
Index interaction code
"""
server-temp/venv/lib/python3.10/site-packages/pip/_internal/index/__pycache__/__init__.cpython-310.pyc
deleted
100644 → 0
View file @
03bf2ef9
File deleted
server-temp/venv/lib/python3.10/site-packages/pip/_internal/index/__pycache__/collector.cpython-310.pyc
deleted
100644 → 0
View file @
03bf2ef9
File deleted
server-temp/venv/lib/python3.10/site-packages/pip/_internal/index/__pycache__/package_finder.cpython-310.pyc
deleted
100644 → 0
View file @
03bf2ef9
File deleted
server-temp/venv/lib/python3.10/site-packages/pip/_internal/index/__pycache__/sources.cpython-310.pyc
deleted
100644 → 0
View file @
03bf2ef9
File deleted
server-temp/venv/lib/python3.10/site-packages/pip/_internal/index/collector.py
deleted
100644 → 0
View file @
03bf2ef9
"""
The main purpose of this module is to expose LinkCollector.collect_sources().
"""
import
collections
import
email.message
import
functools
import
itertools
import
json
import
logging
import
os
import
urllib.parse
import
urllib.request
from
html.parser
import
HTMLParser
from
optparse
import
Values
from
typing
import
(
TYPE_CHECKING
,
Callable
,
Dict
,
Iterable
,
List
,
MutableMapping
,
NamedTuple
,
Optional
,
Sequence
,
Tuple
,
Union
,
)
from
pip._vendor
import
requests
from
pip._vendor.requests
import
Response
from
pip._vendor.requests.exceptions
import
RetryError
,
SSLError
from
pip._internal.exceptions
import
NetworkConnectionError
from
pip._internal.models.link
import
Link
from
pip._internal.models.search_scope
import
SearchScope
from
pip._internal.network.session
import
PipSession
from
pip._internal.network.utils
import
raise_for_status
from
pip._internal.utils.filetypes
import
is_archive_file
from
pip._internal.utils.misc
import
redact_auth_from_url
from
pip._internal.vcs
import
vcs
from
.sources
import
CandidatesFromPage
,
LinkSource
,
build_source
if
TYPE_CHECKING
:
from
typing
import
Protocol
else
:
Protocol
=
object
logger
=
logging
.
getLogger
(
__name__
)
ResponseHeaders
=
MutableMapping
[
str
,
str
]
def
_match_vcs_scheme
(
url
:
str
)
->
Optional
[
str
]:
"""
Look for VCS schemes in the URL.
Returns the matched VCS scheme, or None if there
'
s no match.
"""
for
scheme
in
vcs
.
schemes
:
if
url
.
lower
().
startswith
(
scheme
)
and
url
[
len
(
scheme
)]
in
"
+:
"
:
return
scheme
return
None
class
_NotAPIContent
(
Exception
):
def
__init__
(
self
,
content_type
:
str
,
request_desc
:
str
)
->
None
:
super
().
__init__
(
content_type
,
request_desc
)
self
.
content_type
=
content_type
self
.
request_desc
=
request_desc
def
_ensure_api_header
(
response
:
Response
)
->
None
:
"""
Check the Content-Type header to ensure the response contains a Simple
API Response.
Raises `_NotAPIContent` if the content type is not a valid content-type.
"""
content_type
=
response
.
headers
.
get
(
"
Content-Type
"
,
"
Unknown
"
)
content_type_l
=
content_type
.
lower
()
if
content_type_l
.
startswith
(
(
"
text/html
"
,
"
application/vnd.pypi.simple.v1+html
"
,
"
application/vnd.pypi.simple.v1+json
"
,
)
):
return
raise
_NotAPIContent
(
content_type
,
response
.
request
.
method
)
class
_NotHTTP
(
Exception
):
pass
def
_ensure_api_response
(
url
:
str
,
session
:
PipSession
)
->
None
:
"""
Send a HEAD request to the URL, and ensure the response contains a simple
API Response.
Raises `_NotHTTP` if the URL is not available for a HEAD request, or
`_NotAPIContent` if the content type is not a valid content type.
"""
scheme
,
netloc
,
path
,
query
,
fragment
=
urllib
.
parse
.
urlsplit
(
url
)
if
scheme
not
in
{
"
http
"
,
"
https
"
}:
raise
_NotHTTP
()
resp
=
session
.
head
(
url
,
allow_redirects
=
True
)
raise_for_status
(
resp
)
_ensure_api_header
(
resp
)
def
_get_simple_response
(
url
:
str
,
session
:
PipSession
)
->
Response
:
"""
Access an Simple API response with GET, and return the response.
This consists of three parts:
1. If the URL looks suspiciously like an archive, send a HEAD first to
check the Content-Type is HTML or Simple API, to avoid downloading a
large file. Raise `_NotHTTP` if the content type cannot be determined, or
`_NotAPIContent` if it is not HTML or a Simple API.
2. Actually perform the request. Raise HTTP exceptions on network failures.
3. Check the Content-Type header to make sure we got a Simple API response,
and raise `_NotAPIContent` otherwise.
"""
if
is_archive_file
(
Link
(
url
).
filename
):
_ensure_api_response
(
url
,
session
=
session
)
logger
.
debug
(
"
Getting page %s
"
,
redact_auth_from_url
(
url
))
resp
=
session
.
get
(
url
,
headers
=
{
"
Accept
"
:
"
,
"
.
join
(
[
"
application/vnd.pypi.simple.v1+json
"
,
"
application/vnd.pypi.simple.v1+html; q=0.1
"
,
"
text/html; q=0.01
"
,
]
),
# We don't want to blindly returned cached data for
# /simple/, because authors generally expecting that
# twine upload && pip install will function, but if
# they've done a pip install in the last ~10 minutes
# it won't. Thus by setting this to zero we will not
# blindly use any cached data, however the benefit of
# using max-age=0 instead of no-cache, is that we will
# still support conditional requests, so we will still
# minimize traffic sent in cases where the page hasn't
# changed at all, we will just always incur the round
# trip for the conditional GET now instead of only
# once per 10 minutes.
# For more information, please see pypa/pip#5670.
"
Cache-Control
"
:
"
max-age=0
"
,
},
)
raise_for_status
(
resp
)
# The check for archives above only works if the url ends with
# something that looks like an archive. However that is not a
# requirement of an url. Unless we issue a HEAD request on every
# url we cannot know ahead of time for sure if something is a
# Simple API response or not. However we can check after we've
# downloaded it.
_ensure_api_header
(
resp
)
logger
.
debug
(
"
Fetched page %s as %s
"
,
redact_auth_from_url
(
url
),
resp
.
headers
.
get
(
"
Content-Type
"
,
"
Unknown
"
),
)
return
resp
def
_get_encoding_from_headers
(
headers
:
ResponseHeaders
)
->
Optional
[
str
]:
"""
Determine if we have any encoding information in our headers.
"""
if
headers
and
"
Content-Type
"
in
headers
:
m
=
email
.
message
.
Message
()
m
[
"
content-type
"
]
=
headers
[
"
Content-Type
"
]
charset
=
m
.
get_param
(
"
charset
"
)
if
charset
:
return
str
(
charset
)
return
None
class
CacheablePageContent
:
def
__init__
(
self
,
page
:
"
IndexContent
"
)
->
None
:
assert
page
.
cache_link_parsing
self
.
page
=
page
def
__eq__
(
self
,
other
:
object
)
->
bool
:
return
isinstance
(
other
,
type
(
self
))
and
self
.
page
.
url
==
other
.
page
.
url
def
__hash__
(
self
)
->
int
:
return
hash
(
self
.
page
.
url
)
class
ParseLinks
(
Protocol
):
def
__call__
(
self
,
page
:
"
IndexContent
"
)
->
Iterable
[
Link
]:
...
def
with_cached_index_content
(
fn
:
ParseLinks
)
->
ParseLinks
:
"""
Given a function that parses an Iterable[Link] from an IndexContent, cache the
function
'
s result (keyed by CacheablePageContent), unless the IndexContent
`page` has `page.cache_link_parsing == False`.
"""
@functools.lru_cache
(
maxsize
=
None
)
def
wrapper
(
cacheable_page
:
CacheablePageContent
)
->
List
[
Link
]:
return
list
(
fn
(
cacheable_page
.
page
))
@functools.wraps
(
fn
)
def
wrapper_wrapper
(
page
:
"
IndexContent
"
)
->
List
[
Link
]:
if
page
.
cache_link_parsing
:
return
wrapper
(
CacheablePageContent
(
page
))
return
list
(
fn
(
page
))
return
wrapper_wrapper
@with_cached_index_content
def
parse_links
(
page
:
"
IndexContent
"
)
->
Iterable
[
Link
]:
"""
Parse a Simple API
'
s Index Content, and yield its anchor elements as Link objects.
"""
content_type_l
=
page
.
content_type
.
lower
()
if
content_type_l
.
startswith
(
"
application/vnd.pypi.simple.v1+json
"
):
data
=
json
.
loads
(
page
.
content
)
for
file
in
data
.
get
(
"
files
"
,
[]):
link
=
Link
.
from_json
(
file
,
page
.
url
)
if
link
is
None
:
continue
yield
link
return
parser
=
HTMLLinkParser
(
page
.
url
)
encoding
=
page
.
encoding
or
"
utf-8
"
parser
.
feed
(
page
.
content
.
decode
(
encoding
))
url
=
page
.
url
base_url
=
parser
.
base_url
or
url
for
anchor
in
parser
.
anchors
:
link
=
Link
.
from_element
(
anchor
,
page_url
=
url
,
base_url
=
base_url
)
if
link
is
None
:
continue
yield
link
class
IndexContent
:
"""
Represents one response (or page), along with its URL
"""
def
__init__
(
self
,
content
:
bytes
,
content_type
:
str
,
encoding
:
Optional
[
str
],
url
:
str
,
cache_link_parsing
:
bool
=
True
,
)
->
None
:
"""
:param encoding: the encoding to decode the given content.
:param url: the URL from which the HTML was downloaded.
:param cache_link_parsing: whether links parsed from this page
'
s url
should be cached. PyPI index urls should
have this set to False, for example.
"""
self
.
content
=
content
self
.
content_type
=
content_type
self
.
encoding
=
encoding
self
.
url
=
url
self
.
cache_link_parsing
=
cache_link_parsing
def
__str__
(
self
)
->
str
:
return
redact_auth_from_url
(
self
.
url
)
class
HTMLLinkParser
(
HTMLParser
):
"""
HTMLParser that keeps the first base HREF and a list of all anchor
elements
'
attributes.
"""
def
__init__
(
self
,
url
:
str
)
->
None
:
super
().
__init__
(
convert_charrefs
=
True
)
self
.
url
:
str
=
url
self
.
base_url
:
Optional
[
str
]
=
None
self
.
anchors
:
List
[
Dict
[
str
,
Optional
[
str
]]]
=
[]
def
handle_starttag
(
self
,
tag
:
str
,
attrs
:
List
[
Tuple
[
str
,
Optional
[
str
]]])
->
None
:
if
tag
==
"
base
"
and
self
.
base_url
is
None
:
href
=
self
.
get_href
(
attrs
)
if
href
is
not
None
:
self
.
base_url
=
href
elif
tag
==
"
a
"
:
self
.
anchors
.
append
(
dict
(
attrs
))
def
get_href
(
self
,
attrs
:
List
[
Tuple
[
str
,
Optional
[
str
]]])
->
Optional
[
str
]:
for
name
,
value
in
attrs
:
if
name
==
"
href
"
:
return
value
return
None
def
_handle_get_simple_fail
(
link
:
Link
,
reason
:
Union
[
str
,
Exception
],
meth
:
Optional
[
Callable
[...,
None
]]
=
None
,
)
->
None
:
if
meth
is
None
:
meth
=
logger
.
debug
meth
(
"
Could not fetch URL %s: %s - skipping
"
,
link
,
reason
)
def
_make_index_content
(
response
:
Response
,
cache_link_parsing
:
bool
=
True
)
->
IndexContent
:
encoding
=
_get_encoding_from_headers
(
response
.
headers
)
return
IndexContent
(
response
.
content
,
response
.
headers
[
"
Content-Type
"
],
encoding
=
encoding
,
url
=
response
.
url
,
cache_link_parsing
=
cache_link_parsing
,
)
def
_get_index_content
(
link
:
Link
,
*
,
session
:
PipSession
)
->
Optional
[
"
IndexContent
"
]:
url
=
link
.
url
.
split
(
"
#
"
,
1
)[
0
]
# Check for VCS schemes that do not support lookup as web pages.
vcs_scheme
=
_match_vcs_scheme
(
url
)
if
vcs_scheme
:
logger
.
warning
(
"
Cannot look at %s URL %s because it does not support lookup as web pages.
"
,
vcs_scheme
,
link
,
)
return
None
# Tack index.html onto file:// URLs that point to directories
scheme
,
_
,
path
,
_
,
_
,
_
=
urllib
.
parse
.
urlparse
(
url
)
if
scheme
==
"
file
"
and
os
.
path
.
isdir
(
urllib
.
request
.
url2pathname
(
path
)):
# add trailing slash if not present so urljoin doesn't trim
# final segment
if
not
url
.
endswith
(
"
/
"
):
url
+=
"
/
"
# TODO: In the future, it would be nice if pip supported PEP 691
# style responses in the file:// URLs, however there's no
# standard file extension for application/vnd.pypi.simple.v1+json
# so we'll need to come up with something on our own.
url
=
urllib
.
parse
.
urljoin
(
url
,
"
index.html
"
)
logger
.
debug
(
"
file: URL is directory, getting %s
"
,
url
)
try
:
resp
=
_get_simple_response
(
url
,
session
=
session
)
except
_NotHTTP
:
logger
.
warning
(
"
Skipping page %s because it looks like an archive, and cannot
"
"
be checked by a HTTP HEAD request.
"
,
link
,
)
except
_NotAPIContent
as
exc
:
logger
.
warning
(
"
Skipping page %s because the %s request got Content-Type: %s.
"
"
The only supported Content-Types are application/vnd.pypi.simple.v1+json,
"
"
application/vnd.pypi.simple.v1+html, and text/html
"
,
link
,
exc
.
request_desc
,
exc
.
content_type
,
)
except
NetworkConnectionError
as
exc
:
_handle_get_simple_fail
(
link
,
exc
)
except
RetryError
as
exc
:
_handle_get_simple_fail
(
link
,
exc
)
except
SSLError
as
exc
:
reason
=
"
There was a problem confirming the ssl certificate:
"
reason
+=
str
(
exc
)
_handle_get_simple_fail
(
link
,
reason
,
meth
=
logger
.
info
)
except
requests
.
ConnectionError
as
exc
:
_handle_get_simple_fail
(
link
,
f
"
connection error:
{
exc
}
"
)
except
requests
.
Timeout
:
_handle_get_simple_fail
(
link
,
"
timed out
"
)
else
:
return
_make_index_content
(
resp
,
cache_link_parsing
=
link
.
cache_link_parsing
)
return
None
class
CollectedSources
(
NamedTuple
):
find_links
:
Sequence
[
Optional
[
LinkSource
]]
index_urls
:
Sequence
[
Optional
[
LinkSource
]]
class
LinkCollector
:
"""
Responsible for collecting Link objects from all configured locations,
making network requests as needed.
The class
'
s main method is its collect_sources() method.
"""
def
__init__
(
self
,
session
:
PipSession
,
search_scope
:
SearchScope
,
)
->
None
:
self
.
search_scope
=
search_scope
self
.
session
=
session
@classmethod
def
create
(
cls
,
session
:
PipSession
,
options
:
Values
,
suppress_no_index
:
bool
=
False
,
)
->
"
LinkCollector
"
:
"""
:param session: The Session to use to make requests.
:param suppress_no_index: Whether to ignore the --no-index option
when constructing the SearchScope object.
"""
index_urls
=
[
options
.
index_url
]
+
options
.
extra_index_urls
if
options
.
no_index
and
not
suppress_no_index
:
logger
.
debug
(
"
Ignoring indexes: %s
"
,
"
,
"
.
join
(
redact_auth_from_url
(
url
)
for
url
in
index_urls
),
)
index_urls
=
[]
# Make sure find_links is a list before passing to create().
find_links
=
options
.
find_links
or
[]
search_scope
=
SearchScope
.
create
(
find_links
=
find_links
,
index_urls
=
index_urls
,
no_index
=
options
.
no_index
,
)
link_collector
=
LinkCollector
(
session
=
session
,
search_scope
=
search_scope
,
)
return
link_collector
@property
def
find_links
(
self
)
->
List
[
str
]:
return
self
.
search_scope
.
find_links
def
fetch_response
(
self
,
location
:
Link
)
->
Optional
[
IndexContent
]:
"""
Fetch an HTML page containing package links.
"""
return
_get_index_content
(
location
,
session
=
self
.
session
)
def
collect_sources
(
self
,
project_name
:
str
,
candidates_from_page
:
CandidatesFromPage
,
)
->
CollectedSources
:
# The OrderedDict calls deduplicate sources by URL.
index_url_sources
=
collections
.
OrderedDict
(
build_source
(
loc
,
candidates_from_page
=
candidates_from_page
,
page_validator
=
self
.
session
.
is_secure_origin
,
expand_dir
=
False
,
cache_link_parsing
=
False
,
)
for
loc
in
self
.
search_scope
.
get_index_urls_locations
(
project_name
)
).
values
()
find_links_sources
=
collections
.
OrderedDict
(
build_source
(
loc
,
candidates_from_page
=
candidates_from_page
,
page_validator
=
self
.
session
.
is_secure_origin
,
expand_dir
=
True
,
cache_link_parsing
=
True
,
)
for
loc
in
self
.
find_links
).
values
()
if
logger
.
isEnabledFor
(
logging
.
DEBUG
):
lines
=
[
f
"
*
{
s
.
link
}
"
for
s
in
itertools
.
chain
(
find_links_sources
,
index_url_sources
)
if
s
is
not
None
and
s
.
link
is
not
None
]
lines
=
[
f
"
{
len
(
lines
)
}
location(s) to search
"
f
"
for versions of
{
project_name
}
:
"
]
+
lines
logger
.
debug
(
"
\n
"
.
join
(
lines
))
return
CollectedSources
(
find_links
=
list
(
find_links_sources
),
index_urls
=
list
(
index_url_sources
),
)
server-temp/venv/lib/python3.10/site-packages/pip/_internal/index/package_finder.py
deleted
100644 → 0
View file @
03bf2ef9
"""
Routines related to PyPI, indexes
"""
import
enum
import
functools
import
itertools
import
logging
import
re
from
typing
import
TYPE_CHECKING
,
FrozenSet
,
Iterable
,
List
,
Optional
,
Set
,
Tuple
,
Union
from
pip._vendor.packaging
import
specifiers
from
pip._vendor.packaging.tags
import
Tag
from
pip._vendor.packaging.utils
import
canonicalize_name
from
pip._vendor.packaging.version
import
_BaseVersion
from
pip._vendor.packaging.version
import
parse
as
parse_version
from
pip._internal.exceptions
import
(
BestVersionAlreadyInstalled
,
DistributionNotFound
,
InvalidWheelFilename
,
UnsupportedWheel
,
)
from
pip._internal.index.collector
import
LinkCollector
,
parse_links
from
pip._internal.models.candidate
import
InstallationCandidate
from
pip._internal.models.format_control
import
FormatControl
from
pip._internal.models.link
import
Link
from
pip._internal.models.search_scope
import
SearchScope
from
pip._internal.models.selection_prefs
import
SelectionPreferences
from
pip._internal.models.target_python
import
TargetPython
from
pip._internal.models.wheel
import
Wheel
from
pip._internal.req
import
InstallRequirement
from
pip._internal.utils._log
import
getLogger
from
pip._internal.utils.filetypes
import
WHEEL_EXTENSION
from
pip._internal.utils.hashes
import
Hashes
from
pip._internal.utils.logging
import
indent_log
from
pip._internal.utils.misc
import
build_netloc
from
pip._internal.utils.packaging
import
check_requires_python
from
pip._internal.utils.unpacking
import
SUPPORTED_EXTENSIONS
if
TYPE_CHECKING
:
from
pip._vendor.typing_extensions
import
TypeGuard
__all__
=
[
"
FormatControl
"
,
"
BestCandidateResult
"
,
"
PackageFinder
"
]
logger
=
getLogger
(
__name__
)
BuildTag
=
Union
[
Tuple
[()],
Tuple
[
int
,
str
]]
CandidateSortingKey
=
Tuple
[
int
,
int
,
int
,
_BaseVersion
,
Optional
[
int
],
BuildTag
]
def
_check_link_requires_python
(
link
:
Link
,
version_info
:
Tuple
[
int
,
int
,
int
],
ignore_requires_python
:
bool
=
False
,
)
->
bool
:
"""
Return whether the given Python version is compatible with a link
'
s
"
Requires-Python
"
value.
:param version_info: A 3-tuple of ints representing the Python
major-minor-micro version to check.
:param ignore_requires_python: Whether to ignore the
"
Requires-Python
"
value if the given Python version isn
'
t compatible.
"""
try
:
is_compatible
=
check_requires_python
(
link
.
requires_python
,
version_info
=
version_info
,
)
except
specifiers
.
InvalidSpecifier
:
logger
.
debug
(
"
Ignoring invalid Requires-Python (%r) for link: %s
"
,
link
.
requires_python
,
link
,
)
else
:
if
not
is_compatible
:
version
=
"
.
"
.
join
(
map
(
str
,
version_info
))
if
not
ignore_requires_python
:
logger
.
verbose
(
"
Link requires a different Python (%s not in: %r): %s
"
,
version
,
link
.
requires_python
,
link
,
)
return
False
logger
.
debug
(
"
Ignoring failed Requires-Python check (%s not in: %r) for link: %s
"
,
version
,
link
.
requires_python
,
link
,
)
return
True
class
LinkType
(
enum
.
Enum
):
candidate
=
enum
.
auto
()
different_project
=
enum
.
auto
()
yanked
=
enum
.
auto
()
format_unsupported
=
enum
.
auto
()
format_invalid
=
enum
.
auto
()
platform_mismatch
=
enum
.
auto
()
requires_python_mismatch
=
enum
.
auto
()
class
LinkEvaluator
:
"""
Responsible for evaluating links for a particular project.
"""
_py_version_re
=
re
.
compile
(
r
"
-py([123]\.?[0-9]?)$
"
)
# Don't include an allow_yanked default value to make sure each call
# site considers whether yanked releases are allowed. This also causes
# that decision to be made explicit in the calling code, which helps
# people when reading the code.
def
__init__
(
self
,
project_name
:
str
,
canonical_name
:
str
,
formats
:
FrozenSet
[
str
],
target_python
:
TargetPython
,
allow_yanked
:
bool
,
ignore_requires_python
:
Optional
[
bool
]
=
None
,
)
->
None
:
"""
:param project_name: The user supplied package name.
:param canonical_name: The canonical package name.
:param formats: The formats allowed for this package. Should be a set
with
'
binary
'
or
'
source
'
or both in it.
:param target_python: The target Python interpreter to use when
evaluating link compatibility. This is used, for example, to
check wheel compatibility, as well as when checking the Python
version, e.g. the Python version embedded in a link filename
(or egg fragment) and against an HTML link
'
s optional PEP 503
"
data-requires-python
"
attribute.
:param allow_yanked: Whether files marked as yanked (in the sense
of PEP 592) are permitted to be candidates for install.
:param ignore_requires_python: Whether to ignore incompatible
PEP 503
"
data-requires-python
"
values in HTML links. Defaults
to False.
"""
if
ignore_requires_python
is
None
:
ignore_requires_python
=
False
self
.
_allow_yanked
=
allow_yanked
self
.
_canonical_name
=
canonical_name
self
.
_ignore_requires_python
=
ignore_requires_python
self
.
_formats
=
formats
self
.
_target_python
=
target_python
self
.
project_name
=
project_name
def
evaluate_link
(
self
,
link
:
Link
)
->
Tuple
[
LinkType
,
str
]:
"""
Determine whether a link is a candidate for installation.
:return: A tuple (result, detail), where *result* is an enum
representing whether the evaluation found a candidate, or the reason
why one is not found. If a candidate is found, *detail* will be the
candidate
'
s version string; if one is not found, it contains the
reason the link fails to qualify.
"""
version
=
None
if
link
.
is_yanked
and
not
self
.
_allow_yanked
:
reason
=
link
.
yanked_reason
or
"
<none given>
"
return
(
LinkType
.
yanked
,
f
"
yanked for reason:
{
reason
}
"
)
if
link
.
egg_fragment
:
egg_info
=
link
.
egg_fragment
ext
=
link
.
ext
else
:
egg_info
,
ext
=
link
.
splitext
()
if
not
ext
:
return
(
LinkType
.
format_unsupported
,
"
not a file
"
)
if
ext
not
in
SUPPORTED_EXTENSIONS
:
return
(
LinkType
.
format_unsupported
,
f
"
unsupported archive format:
{
ext
}
"
,
)
if
"
binary
"
not
in
self
.
_formats
and
ext
==
WHEEL_EXTENSION
:
reason
=
f
"
No binaries permitted for
{
self
.
project_name
}
"
return
(
LinkType
.
format_unsupported
,
reason
)
if
"
macosx10
"
in
link
.
path
and
ext
==
"
.zip
"
:
return
(
LinkType
.
format_unsupported
,
"
macosx10 one
"
)
if
ext
==
WHEEL_EXTENSION
:
try
:
wheel
=
Wheel
(
link
.
filename
)
except
InvalidWheelFilename
:
return
(
LinkType
.
format_invalid
,
"
invalid wheel filename
"
,
)
if
canonicalize_name
(
wheel
.
name
)
!=
self
.
_canonical_name
:
reason
=
f
"
wrong project name (not
{
self
.
project_name
}
)
"
return
(
LinkType
.
different_project
,
reason
)
supported_tags
=
self
.
_target_python
.
get_tags
()
if
not
wheel
.
supported
(
supported_tags
):
# Include the wheel's tags in the reason string to
# simplify troubleshooting compatibility issues.
file_tags
=
"
,
"
.
join
(
wheel
.
get_formatted_file_tags
())
reason
=
(
f
"
none of the wheel
'
s tags (
{
file_tags
}
) are compatible
"
f
"
(run pip debug --verbose to show compatible tags)
"
)
return
(
LinkType
.
platform_mismatch
,
reason
)
version
=
wheel
.
version
# This should be up by the self.ok_binary check, but see issue 2700.
if
"
source
"
not
in
self
.
_formats
and
ext
!=
WHEEL_EXTENSION
:
reason
=
f
"
No sources permitted for
{
self
.
project_name
}
"
return
(
LinkType
.
format_unsupported
,
reason
)
if
not
version
:
version
=
_extract_version_from_fragment
(
egg_info
,
self
.
_canonical_name
,
)
if
not
version
:
reason
=
f
"
Missing project version for
{
self
.
project_name
}
"
return
(
LinkType
.
format_invalid
,
reason
)
match
=
self
.
_py_version_re
.
search
(
version
)
if
match
:
version
=
version
[:
match
.
start
()]
py_version
=
match
.
group
(
1
)
if
py_version
!=
self
.
_target_python
.
py_version
:
return
(
LinkType
.
platform_mismatch
,
"
Python version is incorrect
"
,
)
supports_python
=
_check_link_requires_python
(
link
,
version_info
=
self
.
_target_python
.
py_version_info
,
ignore_requires_python
=
self
.
_ignore_requires_python
,
)
if
not
supports_python
:
reason
=
f
"
{
version
}
Requires-Python
{
link
.
requires_python
}
"
return
(
LinkType
.
requires_python_mismatch
,
reason
)
logger
.
debug
(
"
Found link %s, version: %s
"
,
link
,
version
)
return
(
LinkType
.
candidate
,
version
)
def
filter_unallowed_hashes
(
candidates
:
List
[
InstallationCandidate
],
hashes
:
Optional
[
Hashes
],
project_name
:
str
,
)
->
List
[
InstallationCandidate
]:
"""
Filter out candidates whose hashes aren
'
t allowed, and return a new
list of candidates.
If at least one candidate has an allowed hash, then all candidates with
either an allowed hash or no hash specified are returned. Otherwise,
the given candidates are returned.
Including the candidates with no hash specified when there is a match
allows a warning to be logged if there is a more preferred candidate
with no hash specified. Returning all candidates in the case of no
matches lets pip report the hash of the candidate that would otherwise
have been installed (e.g. permitting the user to more easily update
their requirements file with the desired hash).
"""
if
not
hashes
:
logger
.
debug
(
"
Given no hashes to check %s links for project %r:
"
"
discarding no candidates
"
,
len
(
candidates
),
project_name
,
)
# Make sure we're not returning back the given value.
return
list
(
candidates
)
matches_or_no_digest
=
[]
# Collect the non-matches for logging purposes.
non_matches
=
[]
match_count
=
0
for
candidate
in
candidates
:
link
=
candidate
.
link
if
not
link
.
has_hash
:
pass
elif
link
.
is_hash_allowed
(
hashes
=
hashes
):
match_count
+=
1
else
:
non_matches
.
append
(
candidate
)
continue
matches_or_no_digest
.
append
(
candidate
)
if
match_count
:
filtered
=
matches_or_no_digest
else
:
# Make sure we're not returning back the given value.
filtered
=
list
(
candidates
)
if
len
(
filtered
)
==
len
(
candidates
):
discard_message
=
"
discarding no candidates
"
else
:
discard_message
=
"
discarding {} non-matches:
\n
{}
"
.
format
(
len
(
non_matches
),
"
\n
"
.
join
(
str
(
candidate
.
link
)
for
candidate
in
non_matches
),
)
logger
.
debug
(
"
Checked %s links for project %r against %s hashes
"
"
(%s matches, %s no digest): %s
"
,
len
(
candidates
),
project_name
,
hashes
.
digest_count
,
match_count
,
len
(
matches_or_no_digest
)
-
match_count
,
discard_message
,
)
return
filtered
class
CandidatePreferences
:
"""
Encapsulates some of the preferences for filtering and sorting
InstallationCandidate objects.
"""
def
__init__
(
self
,
prefer_binary
:
bool
=
False
,
allow_all_prereleases
:
bool
=
False
,
)
->
None
:
"""
:param allow_all_prereleases: Whether to allow all pre-releases.
"""
self
.
allow_all_prereleases
=
allow_all_prereleases
self
.
prefer_binary
=
prefer_binary
class
BestCandidateResult
:
"""
A collection of candidates, returned by `PackageFinder.find_best_candidate`.
This class is only intended to be instantiated by CandidateEvaluator
'
s
`compute_best_candidate()` method.
"""
def
__init__
(
self
,
candidates
:
List
[
InstallationCandidate
],
applicable_candidates
:
List
[
InstallationCandidate
],
best_candidate
:
Optional
[
InstallationCandidate
],
)
->
None
:
"""
:param candidates: A sequence of all available candidates found.
:param applicable_candidates: The applicable candidates.
:param best_candidate: The most preferred candidate found, or None
if no applicable candidates were found.
"""
assert
set
(
applicable_candidates
)
<=
set
(
candidates
)
if
best_candidate
is
None
:
assert
not
applicable_candidates
else
:
assert
best_candidate
in
applicable_candidates
self
.
_applicable_candidates
=
applicable_candidates
self
.
_candidates
=
candidates
self
.
best_candidate
=
best_candidate
def
iter_all
(
self
)
->
Iterable
[
InstallationCandidate
]:
"""
Iterate through all candidates.
"""
return
iter
(
self
.
_candidates
)
def
iter_applicable
(
self
)
->
Iterable
[
InstallationCandidate
]:
"""
Iterate through the applicable candidates.
"""
return
iter
(
self
.
_applicable_candidates
)
class
CandidateEvaluator
:
"""
Responsible for filtering and sorting candidates for installation based
on what tags are valid.
"""
@classmethod
def
create
(
cls
,
project_name
:
str
,
target_python
:
Optional
[
TargetPython
]
=
None
,
prefer_binary
:
bool
=
False
,
allow_all_prereleases
:
bool
=
False
,
specifier
:
Optional
[
specifiers
.
BaseSpecifier
]
=
None
,
hashes
:
Optional
[
Hashes
]
=
None
,
)
->
"
CandidateEvaluator
"
:
"""
Create a CandidateEvaluator object.
:param target_python: The target Python interpreter to use when
checking compatibility. If None (the default), a TargetPython
object will be constructed from the running Python.
:param specifier: An optional object implementing `filter`
(e.g. `packaging.specifiers.SpecifierSet`) to filter applicable
versions.
:param hashes: An optional collection of allowed hashes.
"""
if
target_python
is
None
:
target_python
=
TargetPython
()
if
specifier
is
None
:
specifier
=
specifiers
.
SpecifierSet
()
supported_tags
=
target_python
.
get_tags
()
return
cls
(
project_name
=
project_name
,
supported_tags
=
supported_tags
,
specifier
=
specifier
,
prefer_binary
=
prefer_binary
,
allow_all_prereleases
=
allow_all_prereleases
,
hashes
=
hashes
,
)
def
__init__
(
self
,
project_name
:
str
,
supported_tags
:
List
[
Tag
],
specifier
:
specifiers
.
BaseSpecifier
,
prefer_binary
:
bool
=
False
,
allow_all_prereleases
:
bool
=
False
,
hashes
:
Optional
[
Hashes
]
=
None
,
)
->
None
:
"""
:param supported_tags: The PEP 425 tags supported by the target
Python in order of preference (most preferred first).
"""
self
.
_allow_all_prereleases
=
allow_all_prereleases
self
.
_hashes
=
hashes
self
.
_prefer_binary
=
prefer_binary
self
.
_project_name
=
project_name
self
.
_specifier
=
specifier
self
.
_supported_tags
=
supported_tags
# Since the index of the tag in the _supported_tags list is used
# as a priority, precompute a map from tag to index/priority to be
# used in wheel.find_most_preferred_tag.
self
.
_wheel_tag_preferences
=
{
tag
:
idx
for
idx
,
tag
in
enumerate
(
supported_tags
)
}
def
get_applicable_candidates
(
self
,
candidates
:
List
[
InstallationCandidate
],
)
->
List
[
InstallationCandidate
]:
"""
Return the applicable candidates from a list of candidates.
"""
# Using None infers from the specifier instead.
allow_prereleases
=
self
.
_allow_all_prereleases
or
None
specifier
=
self
.
_specifier
versions
=
{
str
(
v
)
for
v
in
specifier
.
filter
(
# We turn the version object into a str here because otherwise
# when we're debundled but setuptools isn't, Python will see
# packaging.version.Version and
# pkg_resources._vendor.packaging.version.Version as different
# types. This way we'll use a str as a common data interchange
# format. If we stop using the pkg_resources provided specifier
# and start using our own, we can drop the cast to str().
(
str
(
c
.
version
)
for
c
in
candidates
),
prereleases
=
allow_prereleases
,
)
}
# Again, converting version to str to deal with debundling.
applicable_candidates
=
[
c
for
c
in
candidates
if
str
(
c
.
version
)
in
versions
]
filtered_applicable_candidates
=
filter_unallowed_hashes
(
candidates
=
applicable_candidates
,
hashes
=
self
.
_hashes
,
project_name
=
self
.
_project_name
,
)
return
sorted
(
filtered_applicable_candidates
,
key
=
self
.
_sort_key
)
def
_sort_key
(
self
,
candidate
:
InstallationCandidate
)
->
CandidateSortingKey
:
"""
Function to pass as the `key` argument to a call to sorted() to sort
InstallationCandidates by preference.
Returns a tuple such that tuples sorting as greater using Python
'
s
default comparison operator are more preferred.
The preference is as follows:
First and foremost, candidates with allowed (matching) hashes are
always preferred over candidates without matching hashes. This is
because e.g. if the only candidate with an allowed hash is yanked,
we still want to use that candidate.
Second, excepting hash considerations, candidates that have been
yanked (in the sense of PEP 592) are always less preferred than
candidates that haven
'
t been yanked. Then:
If not finding wheels, they are sorted by version only.
If finding wheels, then the sort order is by version, then:
1. existing installs
2. wheels ordered via Wheel.support_index_min(self._supported_tags)
3. source archives
If prefer_binary was set, then all wheels are sorted above sources.
Note: it was considered to embed this logic into the Link
comparison operators, but then different sdist links
with the same version, would have to be considered equal
"""
valid_tags
=
self
.
_supported_tags
support_num
=
len
(
valid_tags
)
build_tag
:
BuildTag
=
()
binary_preference
=
0
link
=
candidate
.
link
if
link
.
is_wheel
:
# can raise InvalidWheelFilename
wheel
=
Wheel
(
link
.
filename
)
try
:
pri
=
-
(
wheel
.
find_most_preferred_tag
(
valid_tags
,
self
.
_wheel_tag_preferences
)
)
except
ValueError
:
raise
UnsupportedWheel
(
"
{} is not a supported wheel for this platform. It
"
"
can
'
t be sorted.
"
.
format
(
wheel
.
filename
)
)
if
self
.
_prefer_binary
:
binary_preference
=
1
if
wheel
.
build_tag
is
not
None
:
match
=
re
.
match
(
r
"
^(\d+)(.*)$
"
,
wheel
.
build_tag
)
assert
match
is
not
None
,
"
guaranteed by filename validation
"
build_tag_groups
=
match
.
groups
()
build_tag
=
(
int
(
build_tag_groups
[
0
]),
build_tag_groups
[
1
])
else
:
# sdist
pri
=
-
(
support_num
)
has_allowed_hash
=
int
(
link
.
is_hash_allowed
(
self
.
_hashes
))
yank_value
=
-
1
*
int
(
link
.
is_yanked
)
# -1 for yanked.
return
(
has_allowed_hash
,
yank_value
,
binary_preference
,
candidate
.
version
,
pri
,
build_tag
,
)
def
sort_best_candidate
(
self
,
candidates
:
List
[
InstallationCandidate
],
)
->
Optional
[
InstallationCandidate
]:
"""
Return the best candidate per the instance
'
s sort order, or None if
no candidate is acceptable.
"""
if
not
candidates
:
return
None
best_candidate
=
max
(
candidates
,
key
=
self
.
_sort_key
)
return
best_candidate
def
compute_best_candidate
(
self
,
candidates
:
List
[
InstallationCandidate
],
)
->
BestCandidateResult
:
"""
Compute and return a `BestCandidateResult` instance.
"""
applicable_candidates
=
self
.
get_applicable_candidates
(
candidates
)
best_candidate
=
self
.
sort_best_candidate
(
applicable_candidates
)
return
BestCandidateResult
(
candidates
,
applicable_candidates
=
applicable_candidates
,
best_candidate
=
best_candidate
,
)
class
PackageFinder
:
"""
This finds packages.
This is meant to match easy_install
'
s technique for looking for
packages, by reading pages and looking for appropriate links.
"""
def
__init__
(
self
,
link_collector
:
LinkCollector
,
target_python
:
TargetPython
,
allow_yanked
:
bool
,
format_control
:
Optional
[
FormatControl
]
=
None
,
candidate_prefs
:
Optional
[
CandidatePreferences
]
=
None
,
ignore_requires_python
:
Optional
[
bool
]
=
None
,
)
->
None
:
"""
This constructor is primarily meant to be used by the create() class
method and from tests.
:param format_control: A FormatControl object, used to control
the selection of source packages / binary packages when consulting
the index and links.
:param candidate_prefs: Options to use when creating a
CandidateEvaluator object.
"""
if
candidate_prefs
is
None
:
candidate_prefs
=
CandidatePreferences
()
format_control
=
format_control
or
FormatControl
(
set
(),
set
())
self
.
_allow_yanked
=
allow_yanked
self
.
_candidate_prefs
=
candidate_prefs
self
.
_ignore_requires_python
=
ignore_requires_python
self
.
_link_collector
=
link_collector
self
.
_target_python
=
target_python
self
.
format_control
=
format_control
# These are boring links that have already been logged somehow.
self
.
_logged_links
:
Set
[
Tuple
[
Link
,
LinkType
,
str
]]
=
set
()
# Don't include an allow_yanked default value to make sure each call
# site considers whether yanked releases are allowed. This also causes
# that decision to be made explicit in the calling code, which helps
# people when reading the code.
@classmethod
def
create
(
cls
,
link_collector
:
LinkCollector
,
selection_prefs
:
SelectionPreferences
,
target_python
:
Optional
[
TargetPython
]
=
None
,
)
->
"
PackageFinder
"
:
"""
Create a PackageFinder.
:param selection_prefs: The candidate selection preferences, as a
SelectionPreferences object.
:param target_python: The target Python interpreter to use when
checking compatibility. If None (the default), a TargetPython
object will be constructed from the running Python.
"""
if
target_python
is
None
:
target_python
=
TargetPython
()
candidate_prefs
=
CandidatePreferences
(
prefer_binary
=
selection_prefs
.
prefer_binary
,
allow_all_prereleases
=
selection_prefs
.
allow_all_prereleases
,
)
return
cls
(
candidate_prefs
=
candidate_prefs
,
link_collector
=
link_collector
,
target_python
=
target_python
,
allow_yanked
=
selection_prefs
.
allow_yanked
,
format_control
=
selection_prefs
.
format_control
,
ignore_requires_python
=
selection_prefs
.
ignore_requires_python
,
)
@property
def
target_python
(
self
)
->
TargetPython
:
return
self
.
_target_python
@property
def
search_scope
(
self
)
->
SearchScope
:
return
self
.
_link_collector
.
search_scope
@search_scope.setter
def
search_scope
(
self
,
search_scope
:
SearchScope
)
->
None
:
self
.
_link_collector
.
search_scope
=
search_scope
@property
def
find_links
(
self
)
->
List
[
str
]:
return
self
.
_link_collector
.
find_links
@property
def
index_urls
(
self
)
->
List
[
str
]:
return
self
.
search_scope
.
index_urls
@property
def
trusted_hosts
(
self
)
->
Iterable
[
str
]:
for
host_port
in
self
.
_link_collector
.
session
.
pip_trusted_origins
:
yield
build_netloc
(
*
host_port
)
@property
def
allow_all_prereleases
(
self
)
->
bool
:
return
self
.
_candidate_prefs
.
allow_all_prereleases
def
set_allow_all_prereleases
(
self
)
->
None
:
self
.
_candidate_prefs
.
allow_all_prereleases
=
True
@property
def
prefer_binary
(
self
)
->
bool
:
return
self
.
_candidate_prefs
.
prefer_binary
def
set_prefer_binary
(
self
)
->
None
:
self
.
_candidate_prefs
.
prefer_binary
=
True
def
requires_python_skipped_reasons
(
self
)
->
List
[
str
]:
reasons
=
{
detail
for
_
,
result
,
detail
in
self
.
_logged_links
if
result
==
LinkType
.
requires_python_mismatch
}
return
sorted
(
reasons
)
def
make_link_evaluator
(
self
,
project_name
:
str
)
->
LinkEvaluator
:
canonical_name
=
canonicalize_name
(
project_name
)
formats
=
self
.
format_control
.
get_allowed_formats
(
canonical_name
)
return
LinkEvaluator
(
project_name
=
project_name
,
canonical_name
=
canonical_name
,
formats
=
formats
,
target_python
=
self
.
_target_python
,
allow_yanked
=
self
.
_allow_yanked
,
ignore_requires_python
=
self
.
_ignore_requires_python
,
)
def
_sort_links
(
self
,
links
:
Iterable
[
Link
])
->
List
[
Link
]:
"""
Returns elements of links in order, non-egg links first, egg links
second, while eliminating duplicates
"""
eggs
,
no_eggs
=
[],
[]
seen
:
Set
[
Link
]
=
set
()
for
link
in
links
:
if
link
not
in
seen
:
seen
.
add
(
link
)
if
link
.
egg_fragment
:
eggs
.
append
(
link
)
else
:
no_eggs
.
append
(
link
)
return
no_eggs
+
eggs
def
_log_skipped_link
(
self
,
link
:
Link
,
result
:
LinkType
,
detail
:
str
)
->
None
:
entry
=
(
link
,
result
,
detail
)
if
entry
not
in
self
.
_logged_links
:
# Put the link at the end so the reason is more visible and because
# the link string is usually very long.
logger
.
debug
(
"
Skipping link: %s: %s
"
,
detail
,
link
)
self
.
_logged_links
.
add
(
entry
)
def
get_install_candidate
(
self
,
link_evaluator
:
LinkEvaluator
,
link
:
Link
)
->
Optional
[
InstallationCandidate
]:
"""
If the link is a candidate for install, convert it to an
InstallationCandidate and return it. Otherwise, return None.
"""
result
,
detail
=
link_evaluator
.
evaluate_link
(
link
)
if
result
!=
LinkType
.
candidate
:
self
.
_log_skipped_link
(
link
,
result
,
detail
)
return
None
return
InstallationCandidate
(
name
=
link_evaluator
.
project_name
,
link
=
link
,
version
=
detail
,
)
def
evaluate_links
(
self
,
link_evaluator
:
LinkEvaluator
,
links
:
Iterable
[
Link
]
)
->
List
[
InstallationCandidate
]:
"""
Convert links that are candidates to InstallationCandidate objects.
"""
candidates
=
[]
for
link
in
self
.
_sort_links
(
links
):
candidate
=
self
.
get_install_candidate
(
link_evaluator
,
link
)
if
candidate
is
not
None
:
candidates
.
append
(
candidate
)
return
candidates
def
process_project_url
(
self
,
project_url
:
Link
,
link_evaluator
:
LinkEvaluator
)
->
List
[
InstallationCandidate
]:
logger
.
debug
(
"
Fetching project page and analyzing links: %s
"
,
project_url
,
)
index_response
=
self
.
_link_collector
.
fetch_response
(
project_url
)
if
index_response
is
None
:
return
[]
page_links
=
list
(
parse_links
(
index_response
))
with
indent_log
():
package_links
=
self
.
evaluate_links
(
link_evaluator
,
links
=
page_links
,
)
return
package_links
@functools.lru_cache
(
maxsize
=
None
)
def
find_all_candidates
(
self
,
project_name
:
str
)
->
List
[
InstallationCandidate
]:
"""
Find all available InstallationCandidate for project_name
This checks index_urls and find_links.
All versions found are returned as an InstallationCandidate list.
See LinkEvaluator.evaluate_link() for details on which files
are accepted.
"""
link_evaluator
=
self
.
make_link_evaluator
(
project_name
)
collected_sources
=
self
.
_link_collector
.
collect_sources
(
project_name
=
project_name
,
candidates_from_page
=
functools
.
partial
(
self
.
process_project_url
,
link_evaluator
=
link_evaluator
,
),
)
page_candidates_it
=
itertools
.
chain
.
from_iterable
(
source
.
page_candidates
()
for
sources
in
collected_sources
for
source
in
sources
if
source
is
not
None
)
page_candidates
=
list
(
page_candidates_it
)
file_links_it
=
itertools
.
chain
.
from_iterable
(
source
.
file_links
()
for
sources
in
collected_sources
for
source
in
sources
if
source
is
not
None
)
file_candidates
=
self
.
evaluate_links
(
link_evaluator
,
sorted
(
file_links_it
,
reverse
=
True
),
)
if
logger
.
isEnabledFor
(
logging
.
DEBUG
)
and
file_candidates
:
paths
=
[]
for
candidate
in
file_candidates
:
assert
candidate
.
link
.
url
# we need to have a URL
try
:
paths
.
append
(
candidate
.
link
.
file_path
)
except
Exception
:
paths
.
append
(
candidate
.
link
.
url
)
# it's not a local file
logger
.
debug
(
"
Local files found: %s
"
,
"
,
"
.
join
(
paths
))
# This is an intentional priority ordering
return
file_candidates
+
page_candidates
def
make_candidate_evaluator
(
self
,
project_name
:
str
,
specifier
:
Optional
[
specifiers
.
BaseSpecifier
]
=
None
,
hashes
:
Optional
[
Hashes
]
=
None
,
)
->
CandidateEvaluator
:
"""
Create a CandidateEvaluator object to use.
"""
candidate_prefs
=
self
.
_candidate_prefs
return
CandidateEvaluator
.
create
(
project_name
=
project_name
,
target_python
=
self
.
_target_python
,
prefer_binary
=
candidate_prefs
.
prefer_binary
,
allow_all_prereleases
=
candidate_prefs
.
allow_all_prereleases
,
specifier
=
specifier
,
hashes
=
hashes
,
)
@functools.lru_cache
(
maxsize
=
None
)
def
find_best_candidate
(
self
,
project_name
:
str
,
specifier
:
Optional
[
specifiers
.
BaseSpecifier
]
=
None
,
hashes
:
Optional
[
Hashes
]
=
None
,
)
->
BestCandidateResult
:
"""
Find matches for the given project and specifier.
:param specifier: An optional object implementing `filter`
(e.g. `packaging.specifiers.SpecifierSet`) to filter applicable
versions.
:return: A `BestCandidateResult` instance.
"""
candidates
=
self
.
find_all_candidates
(
project_name
)
candidate_evaluator
=
self
.
make_candidate_evaluator
(
project_name
=
project_name
,
specifier
=
specifier
,
hashes
=
hashes
,
)
return
candidate_evaluator
.
compute_best_candidate
(
candidates
)
def
find_requirement
(
self
,
req
:
InstallRequirement
,
upgrade
:
bool
)
->
Optional
[
InstallationCandidate
]:
"""
Try to find a Link matching req
Expects req, an InstallRequirement and upgrade, a boolean
Returns a InstallationCandidate if found,
Raises DistributionNotFound or BestVersionAlreadyInstalled otherwise
"""
hashes
=
req
.
hashes
(
trust_internet
=
False
)
best_candidate_result
=
self
.
find_best_candidate
(
req
.
name
,
specifier
=
req
.
specifier
,
hashes
=
hashes
,
)
best_candidate
=
best_candidate_result
.
best_candidate
installed_version
:
Optional
[
_BaseVersion
]
=
None
if
req
.
satisfied_by
is
not
None
:
installed_version
=
req
.
satisfied_by
.
version
def
_format_versions
(
cand_iter
:
Iterable
[
InstallationCandidate
])
->
str
:
# This repeated parse_version and str() conversion is needed to
# handle different vendoring sources from pip and pkg_resources.
# If we stop using the pkg_resources provided specifier and start
# using our own, we can drop the cast to str().
return
(
"
,
"
.
join
(
sorted
(
{
str
(
c
.
version
)
for
c
in
cand_iter
},
key
=
parse_version
,
)
)
or
"
none
"
)
if
installed_version
is
None
and
best_candidate
is
None
:
logger
.
critical
(
"
Could not find a version that satisfies the requirement %s
"
"
(from versions: %s)
"
,
req
,
_format_versions
(
best_candidate_result
.
iter_all
()),
)
raise
DistributionNotFound
(
"
No matching distribution found for {}
"
.
format
(
req
)
)
def
_should_install_candidate
(
candidate
:
Optional
[
InstallationCandidate
],
)
->
"
TypeGuard[InstallationCandidate]
"
:
if
installed_version
is
None
:
return
True
if
best_candidate
is
None
:
return
False
return
best_candidate
.
version
>
installed_version
if
not
upgrade
and
installed_version
is
not
None
:
if
_should_install_candidate
(
best_candidate
):
logger
.
debug
(
"
Existing installed version (%s) satisfies requirement
"
"
(most up-to-date version is %s)
"
,
installed_version
,
best_candidate
.
version
,
)
else
:
logger
.
debug
(
"
Existing installed version (%s) is most up-to-date and
"
"
satisfies requirement
"
,
installed_version
,
)
return
None
if
_should_install_candidate
(
best_candidate
):
logger
.
debug
(
"
Using version %s (newest of versions: %s)
"
,
best_candidate
.
version
,
_format_versions
(
best_candidate_result
.
iter_applicable
()),
)
return
best_candidate
# We have an existing version, and its the best version
logger
.
debug
(
"
Installed version (%s) is most up-to-date (past versions: %s)
"
,
installed_version
,
_format_versions
(
best_candidate_result
.
iter_applicable
()),
)
raise
BestVersionAlreadyInstalled
def
_find_name_version_sep
(
fragment
:
str
,
canonical_name
:
str
)
->
int
:
"""
Find the separator
'
s index based on the package
'
s canonical name.
:param fragment: A <package>+<version> filename
"
fragment
"
(stem) or
egg fragment.
:param canonical_name: The package
'
s canonical name.
This function is needed since the canonicalized name does not necessarily
have the same length as the egg info
'
s name part. An example::
>>>
fragment
=
'
foo__bar-1.0
'
>>>
canonical_name
=
'
foo-bar
'
>>>
_find_name_version_sep
(
fragment
,
canonical_name
)
8
"""
# Project name and version must be separated by one single dash. Find all
# occurrences of dashes; if the string in front of it matches the canonical
# name, this is the one separating the name and version parts.
for
i
,
c
in
enumerate
(
fragment
):
if
c
!=
"
-
"
:
continue
if
canonicalize_name
(
fragment
[:
i
])
==
canonical_name
:
return
i
raise
ValueError
(
f
"
{
fragment
}
does not match
{
canonical_name
}
"
)
def
_extract_version_from_fragment
(
fragment
:
str
,
canonical_name
:
str
)
->
Optional
[
str
]:
"""
Parse the version string from a <package>+<version> filename
"
fragment
"
(stem) or egg fragment.
:param fragment: The string to parse. E.g. foo-2.1
:param canonical_name: The canonicalized name of the package this
belongs to.
"""
try
:
version_start
=
_find_name_version_sep
(
fragment
,
canonical_name
)
+
1
except
ValueError
:
return
None
version
=
fragment
[
version_start
:]
if
not
version
:
return
None
return
version
Prev
1
…
14
15
16
17
18
Next