From 3a40a8623e086bc27f72628e6a414be53a334141 Mon Sep 17 00:00:00 2001 From: Pradyun Gedam Date: Sun, 7 Jul 2024 20:05:27 +0100 Subject: [PATCH] Upgrade setuptools to 70.2.0 --- news/setuptools.vendor.rst | 1 + src/pip/_vendor/pkg_resources/__init__.py | 936 ++++++++++++++------ src/pip/_vendor/vendor.txt | 2 +- tools/vendoring/patches/pkg_resources.patch | 10 +- 4 files changed, 651 insertions(+), 298 deletions(-) create mode 100644 news/setuptools.vendor.rst diff --git a/news/setuptools.vendor.rst b/news/setuptools.vendor.rst new file mode 100644 index 00000000000..530d8912a07 --- /dev/null +++ b/news/setuptools.vendor.rst @@ -0,0 +1 @@ +Upgrade setuptools to 70.2.0 diff --git a/src/pip/_vendor/pkg_resources/__init__.py b/src/pip/_vendor/pkg_resources/__init__.py index 417a537d6f6..d5b4601439b 100644 --- a/src/pip/_vendor/pkg_resources/__init__.py +++ b/src/pip/_vendor/pkg_resources/__init__.py @@ -1,3 +1,6 @@ +# TODO: Add Generic type annotations to initialized collections. +# For now we'd simply use implicit Any/Unknown which would add redundant annotations +# mypy: disable-error-code="var-annotated" """ Package resource API -------------------- @@ -17,9 +20,11 @@ :mod:`importlib.metadata` and :pypi:`packaging` instead. """ +from __future__ import annotations + import sys -if sys.version_info < (3, 8): +if sys.version_info < (3, 8): # noqa: UP036 # Check for unsupported versions raise RuntimeError("Python 3.8 or later is required") import os @@ -27,7 +32,24 @@ import time import re import types -from typing import List, Protocol +from typing import ( + Any, + Literal, + Dict, + Iterator, + Mapping, + MutableSequence, + NamedTuple, + NoReturn, + Tuple, + Union, + TYPE_CHECKING, + Protocol, + Callable, + Iterable, + TypeVar, + overload, +) import zipfile import zipimport import warnings @@ -46,6 +68,7 @@ import ntpath import posixpath import importlib +import importlib.abc import importlib.machinery from pkgutil import get_importer @@ -53,6 +76,8 @@ # capture these to bypass sandboxing from os import utime +from os import open as os_open +from os.path import isdir, split try: from os import mkdir, rename, unlink @@ -62,42 +87,20 @@ # no write support, probably under GAE WRITE_SUPPORT = False -from os import open as os_open -from os.path import isdir, split - from pip._internal.utils._jaraco_text import ( yield_lines, drop_comment, join_continuation, ) +from pip._vendor.packaging import markers as _packaging_markers +from pip._vendor.packaging import requirements as _packaging_requirements +from pip._vendor.packaging import utils as _packaging_utils +from pip._vendor.packaging import version as _packaging_version +from pip._vendor.platformdirs import user_cache_dir as _user_cache_dir -from pip._vendor import platformdirs -from pip._vendor import packaging - -__import__('pip._vendor.packaging.version') -__import__('pip._vendor.packaging.specifiers') -__import__('pip._vendor.packaging.requirements') -__import__('pip._vendor.packaging.markers') -__import__('pip._vendor.packaging.utils') - -# declare some globals that will be defined later to -# satisfy the linters. -require = None -working_set = None -add_activation_listener = None -cleanup_resources = None -resource_stream = None -set_extraction_path = None -resource_isdir = None -resource_string = None -iter_entry_points = None -resource_listdir = None -resource_filename = None -resource_exists = None -_distribution_finders = None -_namespace_handlers = None -_namespace_packages = None - +if TYPE_CHECKING: + from _typeshed import BytesPath, StrPath, StrOrBytesPath + from pip._vendor.typing_extensions import Self warnings.warn( "pkg_resources is deprecated as an API. " @@ -107,6 +110,37 @@ ) +_T = TypeVar("_T") +_DistributionT = TypeVar("_DistributionT", bound="Distribution") +# Type aliases +_NestedStr = Union[str, Iterable[Union[str, Iterable["_NestedStr"]]]] +_InstallerTypeT = Callable[["Requirement"], "_DistributionT"] +_InstallerType = Callable[["Requirement"], Union["Distribution", None]] +_PkgReqType = Union[str, "Requirement"] +_EPDistType = Union["Distribution", _PkgReqType] +_MetadataType = Union["IResourceProvider", None] +_ResolvedEntryPoint = Any # Can be any attribute in the module +_ResourceStream = Any # TODO / Incomplete: A readable file-like object +# Any object works, but let's indicate we expect something like a module (optionally has __loader__ or __file__) +_ModuleLike = Union[object, types.ModuleType] +# Any: Should be _ModuleLike but we end up with issues where _ModuleLike doesn't have _ZipLoaderModule's __loader__ +_ProviderFactoryType = Callable[[Any], "IResourceProvider"] +_DistFinderType = Callable[[_T, str, bool], Iterable["Distribution"]] +_NSHandlerType = Callable[[_T, str, str, types.ModuleType], Union[str, None]] +_AdapterT = TypeVar( + "_AdapterT", _DistFinderType[Any], _ProviderFactoryType, _NSHandlerType[Any] +) + + +# Use _typeshed.importlib.LoaderProtocol once available https://github.com/python/typeshed/pull/11890 +class _LoaderProtocol(Protocol): + def load_module(self, fullname: str, /) -> types.ModuleType: ... + + +class _ZipLoaderModule(Protocol): + __loader__: zipimport.zipimporter + + _PEP440_FALLBACK = re.compile(r"^v?(?P(?:[0-9]+!)?[0-9]+(?:\.[0-9]+)*)", re.I) @@ -117,18 +151,18 @@ class PEP440Warning(RuntimeWarning): """ -parse_version = packaging.version.Version +parse_version = _packaging_version.Version -_state_vars = {} +_state_vars: dict[str, str] = {} -def _declare_state(vartype, **kw): - globals().update(kw) - _state_vars.update(dict.fromkeys(kw, vartype)) +def _declare_state(vartype: str, varname: str, initial_value: _T) -> _T: + _state_vars[varname] = vartype + return initial_value -def __getstate__(): +def __getstate__() -> dict[str, Any]: state = {} g = globals() for k, v in _state_vars.items(): @@ -136,7 +170,7 @@ def __getstate__(): return state -def __setstate__(state): +def __setstate__(state: dict[str, Any]) -> dict[str, Any]: g = globals() for k, v in state.items(): g['_sset_' + _state_vars[k]](k, g[k], v) @@ -291,17 +325,17 @@ class VersionConflict(ResolutionError): _template = "{self.dist} is installed but {self.req} is required" @property - def dist(self): + def dist(self) -> Distribution: return self.args[0] @property - def req(self): + def req(self) -> Requirement: return self.args[1] def report(self): return self._template.format(**locals()) - def with_context(self, required_by): + def with_context(self, required_by: set[Distribution | str]): """ If required_by is non-empty, return a version of self that is a ContextualVersionConflict. @@ -321,7 +355,7 @@ class ContextualVersionConflict(VersionConflict): _template = VersionConflict._template + ' by {self.required_by}' @property - def required_by(self): + def required_by(self) -> set[str]: return self.args[2] @@ -334,11 +368,11 @@ class DistributionNotFound(ResolutionError): ) @property - def req(self): + def req(self) -> Requirement: return self.args[0] @property - def requirers(self): + def requirers(self) -> set[str] | None: return self.args[1] @property @@ -358,7 +392,7 @@ class UnknownExtra(ResolutionError): """Distribution doesn't have an "extra feature" of the given name""" -_provider_factories = {} +_provider_factories: dict[type[_ModuleLike], _ProviderFactoryType] = {} PY_MAJOR = '{}.{}'.format(*sys.version_info) EGG_DIST = 3 @@ -368,7 +402,9 @@ class UnknownExtra(ResolutionError): DEVELOP_DIST = -1 -def register_loader_type(loader_type, provider_factory): +def register_loader_type( + loader_type: type[_ModuleLike], provider_factory: _ProviderFactoryType +): """Register `provider_factory` to make providers for `loader_type` `loader_type` is the type or class of a PEP 302 ``module.__loader__``, @@ -378,7 +414,11 @@ def register_loader_type(loader_type, provider_factory): _provider_factories[loader_type] = provider_factory -def get_provider(moduleOrReq): +@overload +def get_provider(moduleOrReq: str) -> IResourceProvider: ... +@overload +def get_provider(moduleOrReq: Requirement) -> Distribution: ... +def get_provider(moduleOrReq: str | Requirement) -> IResourceProvider | Distribution: """Return an IResourceProvider for the named module or requirement""" if isinstance(moduleOrReq, Requirement): return working_set.find(moduleOrReq) or require(str(moduleOrReq))[0] @@ -440,7 +480,7 @@ def get_build_platform(): get_platform = get_build_platform -def compatible_platforms(provided, required): +def compatible_platforms(provided: str | None, required: str | None): """Can code for the `provided` platform run on the `required` platform? Returns true if either platform is ``None``, or the platforms are equal. @@ -489,89 +529,106 @@ def compatible_platforms(provided, required): return False -def get_distribution(dist): +@overload +def get_distribution(dist: _DistributionT) -> _DistributionT: ... +@overload +def get_distribution(dist: _PkgReqType) -> Distribution: ... +def get_distribution(dist: Distribution | _PkgReqType) -> Distribution: """Return a current distribution object for a Requirement or string""" if isinstance(dist, str): dist = Requirement.parse(dist) if isinstance(dist, Requirement): - dist = get_provider(dist) + # Bad type narrowing, dist has to be a Requirement here, so get_provider has to return Distribution + dist = get_provider(dist) # type: ignore[assignment] if not isinstance(dist, Distribution): - raise TypeError("Expected string, Requirement, or Distribution", dist) + raise TypeError("Expected str, Requirement, or Distribution", dist) return dist -def load_entry_point(dist, group, name): +def load_entry_point(dist: _EPDistType, group: str, name: str) -> _ResolvedEntryPoint: """Return `name` entry point of `group` for `dist` or raise ImportError""" return get_distribution(dist).load_entry_point(group, name) -def get_entry_map(dist, group=None): +@overload +def get_entry_map( + dist: _EPDistType, group: None = None +) -> dict[str, dict[str, EntryPoint]]: ... +@overload +def get_entry_map(dist: _EPDistType, group: str) -> dict[str, EntryPoint]: ... +def get_entry_map(dist: _EPDistType, group: str | None = None): """Return the entry point map for `group`, or the full entry map""" return get_distribution(dist).get_entry_map(group) -def get_entry_info(dist, group, name): +def get_entry_info(dist: _EPDistType, group: str, name: str): """Return the EntryPoint object for `group`+`name`, or ``None``""" return get_distribution(dist).get_entry_info(group, name) class IMetadataProvider(Protocol): - def has_metadata(self, name) -> bool: + def has_metadata(self, name: str) -> bool: """Does the package's distribution contain the named metadata?""" - def get_metadata(self, name): + def get_metadata(self, name: str) -> str: """The named metadata resource as a string""" - def get_metadata_lines(self, name): + def get_metadata_lines(self, name: str) -> Iterator[str]: """Yield named metadata resource as list of non-blank non-comment lines Leading and trailing whitespace is stripped from each line, and lines with ``#`` as the first non-blank character are omitted.""" - def metadata_isdir(self, name) -> bool: + def metadata_isdir(self, name: str) -> bool: """Is the named metadata a directory? (like ``os.path.isdir()``)""" - def metadata_listdir(self, name): + def metadata_listdir(self, name: str) -> list[str]: """List of metadata names in the directory (like ``os.listdir()``)""" - def run_script(self, script_name, namespace): + def run_script(self, script_name: str, namespace: dict[str, Any]) -> None: """Execute the named script in the supplied namespace dictionary""" class IResourceProvider(IMetadataProvider, Protocol): """An object that provides access to package resources""" - def get_resource_filename(self, manager, resource_name): + def get_resource_filename( + self, manager: ResourceManager, resource_name: str + ) -> str: """Return a true filesystem path for `resource_name` - `manager` must be an ``IResourceManager``""" + `manager` must be a ``ResourceManager``""" - def get_resource_stream(self, manager, resource_name): + def get_resource_stream( + self, manager: ResourceManager, resource_name: str + ) -> _ResourceStream: """Return a readable file-like object for `resource_name` - `manager` must be an ``IResourceManager``""" + `manager` must be a ``ResourceManager``""" - def get_resource_string(self, manager, resource_name) -> bytes: + def get_resource_string( + self, manager: ResourceManager, resource_name: str + ) -> bytes: """Return the contents of `resource_name` as :obj:`bytes` - `manager` must be an ``IResourceManager``""" + `manager` must be a ``ResourceManager``""" - def has_resource(self, resource_name): + def has_resource(self, resource_name: str) -> bool: """Does the package contain the named resource?""" - def resource_isdir(self, resource_name): + def resource_isdir(self, resource_name: str) -> bool: """Is the named resource a directory? (like ``os.path.isdir()``)""" - def resource_listdir(self, resource_name): + def resource_listdir(self, resource_name: str) -> list[str]: """List of resource names in the directory (like ``os.listdir()``)""" class WorkingSet: """A collection of active distributions on sys.path (or a similar list)""" - def __init__(self, entries=None): + def __init__(self, entries: Iterable[str] | None = None): """Create working set from list of path entries (default=sys.path)""" - self.entries = [] + self.entries: list[str] = [] self.entry_keys = {} self.by_key = {} self.normalized_to_canonical_keys = {} @@ -625,7 +682,7 @@ def _build_from_requirements(cls, req_spec): sys.path[:] = ws.entries return ws - def add_entry(self, entry): + def add_entry(self, entry: str): """Add a path item to ``.entries``, finding any distributions on it ``find_distributions(entry, True)`` is used to find distributions @@ -640,11 +697,11 @@ def add_entry(self, entry): for dist in find_distributions(entry, True): self.add(dist, entry, False) - def __contains__(self, dist): + def __contains__(self, dist: Distribution) -> bool: """True if `dist` is the active distribution for its project""" return self.by_key.get(dist.key) == dist - def find(self, req): + def find(self, req: Requirement) -> Distribution | None: """Find a distribution matching requirement `req` If there is an active distribution for the requested project, this @@ -668,7 +725,7 @@ def find(self, req): raise VersionConflict(dist, req) return dist - def iter_entry_points(self, group, name=None): + def iter_entry_points(self, group: str, name: str | None = None): """Yield entry point objects from `group` matching `name` If `name` is None, yields all entry points in `group` from all @@ -682,7 +739,7 @@ def iter_entry_points(self, group, name=None): if name is None or name == entry.name ) - def run_script(self, requires, script_name): + def run_script(self, requires: str, script_name: str): """Locate distribution for `requires` and run `script_name` script""" ns = sys._getframe(1).f_globals name = ns['__name__'] @@ -690,13 +747,13 @@ def run_script(self, requires, script_name): ns['__name__'] = name self.require(requires)[0].run_script(script_name, ns) - def __iter__(self): + def __iter__(self) -> Iterator[Distribution]: """Yield distributions for non-duplicate projects in the working set The yield order is the order in which the items' path entries were added to the working set. """ - seen = {} + seen = set() for item in self.entries: if item not in self.entry_keys: # workaround a cache issue @@ -704,10 +761,16 @@ def __iter__(self): for key in self.entry_keys[item]: if key not in seen: - seen[key] = 1 + seen.add(key) yield self.by_key[key] - def add(self, dist, entry=None, insert=True, replace=False): + def add( + self, + dist: Distribution, + entry: str | None = None, + insert: bool = True, + replace: bool = False, + ): """Add `dist` to working set, associated with `entry` If `entry` is unspecified, it defaults to the ``.location`` of `dist`. @@ -731,7 +794,7 @@ def add(self, dist, entry=None, insert=True, replace=False): return self.by_key[dist.key] = dist - normalized_name = packaging.utils.canonicalize_name(dist.key) + normalized_name = _packaging_utils.canonicalize_name(dist.key) self.normalized_to_canonical_keys[normalized_name] = dist.key if dist.key not in keys: keys.append(dist.key) @@ -739,14 +802,42 @@ def add(self, dist, entry=None, insert=True, replace=False): keys2.append(dist.key) self._added_new(dist) + @overload def resolve( self, - requirements, - env=None, - installer=None, - replace_conflicting=False, - extras=None, - ): + requirements: Iterable[Requirement], + env: Environment | None, + installer: _InstallerTypeT[_DistributionT], + replace_conflicting: bool = False, + extras: tuple[str, ...] | None = None, + ) -> list[_DistributionT]: ... + @overload + def resolve( + self, + requirements: Iterable[Requirement], + env: Environment | None = None, + *, + installer: _InstallerTypeT[_DistributionT], + replace_conflicting: bool = False, + extras: tuple[str, ...] | None = None, + ) -> list[_DistributionT]: ... + @overload + def resolve( + self, + requirements: Iterable[Requirement], + env: Environment | None = None, + installer: _InstallerType | None = None, + replace_conflicting: bool = False, + extras: tuple[str, ...] | None = None, + ) -> list[Distribution]: ... + def resolve( + self, + requirements: Iterable[Requirement], + env: Environment | None = None, + installer: _InstallerType | None | _InstallerTypeT[_DistributionT] = None, + replace_conflicting: bool = False, + extras: tuple[str, ...] | None = None, + ) -> list[Distribution] | list[_DistributionT]: """List all distributions needed to (recursively) meet `requirements` `requirements` must be a sequence of ``Requirement`` objects. `env`, @@ -774,7 +865,7 @@ def resolve( # set up the stack requirements = list(requirements)[::-1] # set of processed requirements - processed = {} + processed = set() # key -> dist best = {} to_activate = [] @@ -808,14 +899,14 @@ def resolve( required_by[new_requirement].add(req.project_name) req_extras[new_requirement] = req.extras - processed[req] = True + processed.add(req) # return list of distros to activate return to_activate def _resolve_dist( self, req, best, replace_conflicting, env, installer, required_by, to_activate - ): + ) -> Distribution: dist = best.get(req.key) if dist is None: # Find the best distribution and add it to the map @@ -844,7 +935,41 @@ def _resolve_dist( raise VersionConflict(dist, req).with_context(dependent_req) return dist - def find_plugins(self, plugin_env, full_env=None, installer=None, fallback=True): + @overload + def find_plugins( + self, + plugin_env: Environment, + full_env: Environment | None, + installer: _InstallerTypeT[_DistributionT], + fallback: bool = True, + ) -> tuple[list[_DistributionT], dict[Distribution, Exception]]: ... + @overload + def find_plugins( + self, + plugin_env: Environment, + full_env: Environment | None = None, + *, + installer: _InstallerTypeT[_DistributionT], + fallback: bool = True, + ) -> tuple[list[_DistributionT], dict[Distribution, Exception]]: ... + @overload + def find_plugins( + self, + plugin_env: Environment, + full_env: Environment | None = None, + installer: _InstallerType | None = None, + fallback: bool = True, + ) -> tuple[list[Distribution], dict[Distribution, Exception]]: ... + def find_plugins( + self, + plugin_env: Environment, + full_env: Environment | None = None, + installer: _InstallerType | None | _InstallerTypeT[_DistributionT] = None, + fallback: bool = True, + ) -> tuple[ + list[Distribution] | list[_DistributionT], + dict[Distribution, Exception], + ]: """Find all activatable distributions in `plugin_env` Example usage:: @@ -883,8 +1008,8 @@ def find_plugins(self, plugin_env, full_env=None, installer=None, fallback=True) # scan project names in alphabetic order plugin_projects.sort() - error_info = {} - distributions = {} + error_info: dict[Distribution, Exception] = {} + distributions: dict[Distribution, Exception | None] = {} if full_env is None: env = Environment(self.entries) @@ -920,12 +1045,12 @@ def find_plugins(self, plugin_env, full_env=None, installer=None, fallback=True) # success, no need to try any more versions of this project break - distributions = list(distributions) - distributions.sort() + sorted_distributions = list(distributions) + sorted_distributions.sort() - return distributions, error_info + return sorted_distributions, error_info - def require(self, *requirements): + def require(self, *requirements: _NestedStr): """Ensure that distributions matching `requirements` are activated `requirements` must be a string or a (possibly-nested) sequence @@ -941,7 +1066,9 @@ def require(self, *requirements): return needed - def subscribe(self, callback, existing=True): + def subscribe( + self, callback: Callable[[Distribution], object], existing: bool = True + ): """Invoke `callback` for all distributions If `existing=True` (default), @@ -977,12 +1104,12 @@ def __setstate__(self, e_k_b_n_c): self.callbacks = callbacks[:] -class _ReqExtras(dict): +class _ReqExtras(Dict["Requirement", Tuple[str, ...]]): """ Map each requirement to the extras that demanded it. """ - def markers_pass(self, req, extras=None): + def markers_pass(self, req: Requirement, extras: tuple[str, ...] | None = None): """ Evaluate markers for req against each extra that demanded it. @@ -1001,7 +1128,10 @@ class Environment: """Searchable snapshot of distributions on a search path""" def __init__( - self, search_path=None, platform=get_supported_platform(), python=PY_MAJOR + self, + search_path: Iterable[str] | None = None, + platform: str | None = get_supported_platform(), + python: str | None = PY_MAJOR, ): """Snapshot distributions available on a search path @@ -1024,7 +1154,7 @@ def __init__( self.python = python self.scan(search_path) - def can_add(self, dist): + def can_add(self, dist: Distribution): """Is distribution `dist` acceptable for this environment? The distribution must match the platform and python version @@ -1038,11 +1168,11 @@ def can_add(self, dist): ) return py_compat and compatible_platforms(dist.platform, self.platform) - def remove(self, dist): + def remove(self, dist: Distribution): """Remove `dist` from the environment""" self._distmap[dist.key].remove(dist) - def scan(self, search_path=None): + def scan(self, search_path: Iterable[str] | None = None): """Scan `search_path` for distributions usable in this environment Any distributions found are added to the environment. @@ -1057,7 +1187,7 @@ def scan(self, search_path=None): for dist in find_distributions(item): self.add(dist) - def __getitem__(self, project_name): + def __getitem__(self, project_name: str) -> list[Distribution]: """Return a newest-to-oldest list of distributions for `project_name` Uses case-insensitive `project_name` comparison, assuming all the @@ -1068,7 +1198,7 @@ def __getitem__(self, project_name): distribution_key = project_name.lower() return self._distmap.get(distribution_key, []) - def add(self, dist): + def add(self, dist: Distribution): """Add `dist` if we ``can_add()`` it and it has not already been added""" if self.can_add(dist) and dist.has_version(): dists = self._distmap.setdefault(dist.key, []) @@ -1076,7 +1206,29 @@ def add(self, dist): dists.append(dist) dists.sort(key=operator.attrgetter('hashcmp'), reverse=True) - def best_match(self, req, working_set, installer=None, replace_conflicting=False): + @overload + def best_match( + self, + req: Requirement, + working_set: WorkingSet, + installer: _InstallerTypeT[_DistributionT], + replace_conflicting: bool = False, + ) -> _DistributionT: ... + @overload + def best_match( + self, + req: Requirement, + working_set: WorkingSet, + installer: _InstallerType | None = None, + replace_conflicting: bool = False, + ) -> Distribution | None: ... + def best_match( + self, + req: Requirement, + working_set: WorkingSet, + installer: _InstallerType | None | _InstallerTypeT[_DistributionT] = None, + replace_conflicting: bool = False, + ) -> Distribution | None: """Find distribution best matching `req` and usable on `working_set` This calls the ``find(req)`` method of the `working_set` to see if a @@ -1103,7 +1255,32 @@ def best_match(self, req, working_set, installer=None, replace_conflicting=False # try to download/install return self.obtain(req, installer) - def obtain(self, requirement, installer=None): + @overload + def obtain( + self, + requirement: Requirement, + installer: _InstallerTypeT[_DistributionT], + ) -> _DistributionT: ... + @overload + def obtain( + self, + requirement: Requirement, + installer: Callable[[Requirement], None] | None = None, + ) -> None: ... + @overload + def obtain( + self, + requirement: Requirement, + installer: _InstallerType | None = None, + ) -> Distribution | None: ... + def obtain( + self, + requirement: Requirement, + installer: Callable[[Requirement], None] + | _InstallerType + | None + | _InstallerTypeT[_DistributionT] = None, + ) -> Distribution | None: """Obtain a distribution matching `requirement` (e.g. via download) Obtain a distro that matches requirement (e.g. via download). In the @@ -1114,13 +1291,13 @@ def obtain(self, requirement, installer=None): to the `installer` argument.""" return installer(requirement) if installer else None - def __iter__(self): + def __iter__(self) -> Iterator[str]: """Yield the unique project names of the available distributions""" for key in self._distmap.keys(): if self[key]: yield key - def __iadd__(self, other): + def __iadd__(self, other: Distribution | Environment): """In-place addition of a distribution or environment""" if isinstance(other, Distribution): self.add(other) @@ -1132,7 +1309,7 @@ def __iadd__(self, other): raise TypeError("Can't add %r to environment" % (other,)) return self - def __add__(self, other): + def __add__(self, other: Distribution | Environment): """Add an environment or distribution to an environment""" new = self.__class__([], platform=None, python=None) for env in self, other: @@ -1159,46 +1336,54 @@ class ExtractionError(RuntimeError): The exception instance that caused extraction to fail """ + manager: ResourceManager + cache_path: str + original_error: BaseException | None + class ResourceManager: """Manage resource extraction and packages""" - extraction_path = None + extraction_path: str | None = None def __init__(self): self.cached_files = {} - def resource_exists(self, package_or_requirement, resource_name): + def resource_exists(self, package_or_requirement: _PkgReqType, resource_name: str): """Does the named resource exist?""" return get_provider(package_or_requirement).has_resource(resource_name) - def resource_isdir(self, package_or_requirement, resource_name): + def resource_isdir(self, package_or_requirement: _PkgReqType, resource_name: str): """Is the named resource an existing directory?""" return get_provider(package_or_requirement).resource_isdir(resource_name) - def resource_filename(self, package_or_requirement, resource_name): + def resource_filename( + self, package_or_requirement: _PkgReqType, resource_name: str + ): """Return a true filesystem path for specified resource""" return get_provider(package_or_requirement).get_resource_filename( self, resource_name ) - def resource_stream(self, package_or_requirement, resource_name): + def resource_stream(self, package_or_requirement: _PkgReqType, resource_name: str): """Return a readable file-like object for specified resource""" return get_provider(package_or_requirement).get_resource_stream( self, resource_name ) - def resource_string(self, package_or_requirement, resource_name) -> bytes: + def resource_string( + self, package_or_requirement: _PkgReqType, resource_name: str + ) -> bytes: """Return specified resource as :obj:`bytes`""" return get_provider(package_or_requirement).get_resource_string( self, resource_name ) - def resource_listdir(self, package_or_requirement, resource_name): + def resource_listdir(self, package_or_requirement: _PkgReqType, resource_name: str): """List the contents of the named resource directory""" return get_provider(package_or_requirement).resource_listdir(resource_name) - def extraction_error(self): + def extraction_error(self) -> NoReturn: """Give an error message for problems extracting file(s)""" old_exc = sys.exc_info()[1] @@ -1228,7 +1413,7 @@ def extraction_error(self): err.original_error = old_exc raise err - def get_cache_path(self, archive_name, names=()): + def get_cache_path(self, archive_name: str, names: Iterable[StrPath] = ()): """Return absolute location in cache for `archive_name` and `names` The parent directory of the resulting path will be created if it does @@ -1250,7 +1435,7 @@ def get_cache_path(self, archive_name, names=()): self._warn_unsafe_extraction_path(extract_path) - self.cached_files[target_path] = 1 + self.cached_files[target_path] = True return target_path @staticmethod @@ -1280,7 +1465,7 @@ def _warn_unsafe_extraction_path(path): ).format(**locals()) warnings.warn(msg, UserWarning) - def postprocess(self, tempname, filename): + def postprocess(self, tempname: StrOrBytesPath, filename: StrOrBytesPath): """Perform any platform-specific postprocessing of `tempname` This is where Mac header rewrites should be done; other platforms don't @@ -1300,7 +1485,7 @@ def postprocess(self, tempname, filename): mode = ((os.stat(tempname).st_mode) | 0o555) & 0o7777 os.chmod(tempname, mode) - def set_extraction_path(self, path): + def set_extraction_path(self, path: str): """Set the base path where resources will be extracted to, if needed. If you do not call this routine before any extractions take place, the @@ -1324,7 +1509,7 @@ def set_extraction_path(self, path): self.extraction_path = path - def cleanup_resources(self, force=False) -> List[str]: + def cleanup_resources(self, force: bool = False) -> list[str]: """ Delete all extracted resource files and directories, returning a list of the file and directory names that could not be successfully removed. @@ -1339,18 +1524,16 @@ def cleanup_resources(self, force=False) -> List[str]: return [] -def get_default_cache(): +def get_default_cache() -> str: """ Return the ``PYTHON_EGG_CACHE`` environment variable or a platform-relevant user cache dir for an app named "Python-Eggs". """ - return os.environ.get('PYTHON_EGG_CACHE') or platformdirs.user_cache_dir( - appname='Python-Eggs' - ) + return os.environ.get('PYTHON_EGG_CACHE') or _user_cache_dir(appname='Python-Eggs') -def safe_name(name): +def safe_name(name: str): """Convert an arbitrary string to a standard distribution name Any runs of non-alphanumeric/. characters are replaced with a single '-'. @@ -1358,14 +1541,14 @@ def safe_name(name): return re.sub('[^A-Za-z0-9.]+', '-', name) -def safe_version(version): +def safe_version(version: str): """ Convert an arbitrary string to a standard version string """ try: # normalize the version - return str(packaging.version.Version(version)) - except packaging.version.InvalidVersion: + return str(_packaging_version.Version(version)) + except _packaging_version.InvalidVersion: version = version.replace(' ', '.') return re.sub('[^A-Za-z0-9.]+', '-', version) @@ -1402,7 +1585,7 @@ def _safe_segment(segment): return re.sub(r'\.[^A-Za-z0-9]+', '.', segment).strip(".-") -def safe_extra(extra): +def safe_extra(extra: str): """Convert an arbitrary string to a standard 'extra' name Any runs of non-alphanumeric characters are replaced with a single '_', @@ -1411,7 +1594,7 @@ def safe_extra(extra): return re.sub('[^A-Za-z0-9.-]+', '_', extra).lower() -def to_filename(name): +def to_filename(name: str): """Convert a project or version name to its filename-escaped form Any '-' characters are currently replaced with '_'. @@ -1419,7 +1602,7 @@ def to_filename(name): return name.replace('-', '_') -def invalid_marker(text): +def invalid_marker(text: str): """ Validate text as a PEP 508 environment marker; return an exception if invalid or False otherwise. @@ -1433,7 +1616,7 @@ def invalid_marker(text): return False -def evaluate_marker(text, extra=None): +def evaluate_marker(text: str, extra: str | None = None) -> bool: """ Evaluate a PEP 508 environment marker. Return a boolean indicating the marker result in this environment. @@ -1442,46 +1625,48 @@ def evaluate_marker(text, extra=None): This implementation uses the 'pyparsing' module. """ try: - marker = packaging.markers.Marker(text) + marker = _packaging_markers.Marker(text) return marker.evaluate() - except packaging.markers.InvalidMarker as e: + except _packaging_markers.InvalidMarker as e: raise SyntaxError(e) from e class NullProvider: """Try to implement resources and metadata for arbitrary PEP 302 loaders""" - egg_name = None - egg_info = None - loader = None + egg_name: str | None = None + egg_info: str | None = None + loader: _LoaderProtocol | None = None - def __init__(self, module): + def __init__(self, module: _ModuleLike): self.loader = getattr(module, '__loader__', None) self.module_path = os.path.dirname(getattr(module, '__file__', '')) - def get_resource_filename(self, manager, resource_name): + def get_resource_filename(self, manager: ResourceManager, resource_name: str): return self._fn(self.module_path, resource_name) - def get_resource_stream(self, manager, resource_name): + def get_resource_stream(self, manager: ResourceManager, resource_name: str): return io.BytesIO(self.get_resource_string(manager, resource_name)) - def get_resource_string(self, manager, resource_name) -> bytes: + def get_resource_string( + self, manager: ResourceManager, resource_name: str + ) -> bytes: return self._get(self._fn(self.module_path, resource_name)) - def has_resource(self, resource_name): + def has_resource(self, resource_name: str): return self._has(self._fn(self.module_path, resource_name)) def _get_metadata_path(self, name): return self._fn(self.egg_info, name) - def has_metadata(self, name) -> bool: + def has_metadata(self, name: str) -> bool: if not self.egg_info: return False path = self._get_metadata_path(name) return self._has(path) - def get_metadata(self, name): + def get_metadata(self, name: str): if not self.egg_info: return "" path = self._get_metadata_path(name) @@ -1494,24 +1679,24 @@ def get_metadata(self, name): exc.reason += ' in {} file at path: {}'.format(name, path) raise - def get_metadata_lines(self, name): + def get_metadata_lines(self, name: str) -> Iterator[str]: return yield_lines(self.get_metadata(name)) - def resource_isdir(self, resource_name): + def resource_isdir(self, resource_name: str): return self._isdir(self._fn(self.module_path, resource_name)) - def metadata_isdir(self, name) -> bool: + def metadata_isdir(self, name: str) -> bool: return bool(self.egg_info and self._isdir(self._fn(self.egg_info, name))) - def resource_listdir(self, resource_name): + def resource_listdir(self, resource_name: str): return self._listdir(self._fn(self.module_path, resource_name)) - def metadata_listdir(self, name): + def metadata_listdir(self, name: str) -> list[str]: if self.egg_info: return self._listdir(self._fn(self.egg_info, name)) return [] - def run_script(self, script_name, namespace): + def run_script(self, script_name: str, namespace: dict[str, Any]): script = 'scripts/' + script_name if not self.has_metadata(script): raise ResolutionError( @@ -1519,13 +1704,13 @@ def run_script(self, script_name, namespace): **locals() ), ) + script_text = self.get_metadata(script).replace('\r\n', '\n') script_text = script_text.replace('\r', '\n') script_filename = self._fn(self.egg_info, script) namespace['__file__'] = script_filename if os.path.exists(script_filename): - with open(script_filename) as fid: - source = fid.read() + source = _read_utf8_with_fallback(script_filename) code = compile(source, script_filename, 'exec') exec(code, namespace, namespace) else: @@ -1550,12 +1735,16 @@ def _isdir(self, path) -> bool: "Can't perform this operation for unregistered loader type" ) - def _listdir(self, path): + def _listdir(self, path) -> list[str]: raise NotImplementedError( "Can't perform this operation for unregistered loader type" ) - def _fn(self, base, resource_name): + def _fn(self, base: str | None, resource_name: str): + if base is None: + raise TypeError( + "`base` parameter in `_fn` is `None`. Either override this method or check the parameter first." + ) self._validate_resource_path(resource_name) if resource_name: return os.path.join(base, *resource_name.split('/')) @@ -1618,6 +1807,7 @@ def _validate_resource_path(path): os.path.pardir in path.split(posixpath.sep) or posixpath.isabs(path) or ntpath.isabs(path) + or path.startswith("\\") ) if not invalid: return @@ -1625,7 +1815,7 @@ def _validate_resource_path(path): msg = "Use of .. or absolute path in a resource path is not allowed." # Aggressively disallow Windows absolute paths - if ntpath.isabs(path) and not posixpath.isabs(path): + if (path.startswith("\\") or ntpath.isabs(path)) and not posixpath.isabs(path): raise ValueError(msg) # for compatibility, warn; in future @@ -1636,8 +1826,9 @@ def _validate_resource_path(path): ) def _get(self, path) -> bytes: - if hasattr(self.loader, 'get_data'): - return self.loader.get_data(path) + if hasattr(self.loader, 'get_data') and self.loader: + # Already checked get_data exists + return self.loader.get_data(path) # type: ignore[attr-defined] raise NotImplementedError( "Can't perform this operation for loaders without 'get_data()'" ) @@ -1660,7 +1851,7 @@ def _parents(path): class EggProvider(NullProvider): """Provider based on a virtual filesystem""" - def __init__(self, module): + def __init__(self, module: _ModuleLike): super().__init__(module) self._setup_prefix() @@ -1671,7 +1862,7 @@ def _setup_prefix(self): egg = next(eggs, None) egg and self._set_egg(egg) - def _set_egg(self, path): + def _set_egg(self, path: str): self.egg_name = os.path.basename(path) self.egg_info = os.path.join(path, 'EGG-INFO') self.egg_root = path @@ -1689,7 +1880,7 @@ def _isdir(self, path) -> bool: def _listdir(self, path): return os.listdir(path) - def get_resource_stream(self, manager, resource_name): + def get_resource_stream(self, manager: object, resource_name: str): return open(self._fn(self.module_path, resource_name), 'rb') def _get(self, path) -> bytes: @@ -1713,7 +1904,8 @@ def _register(cls): class EmptyProvider(NullProvider): """Provider that returns nothing for all requests""" - module_path = None + # A special case, we don't want all Providers inheriting from NullProvider to have a potentially None module_path + module_path: str | None = None # type: ignore[assignment] _isdir = _has = lambda self, path: False @@ -1730,13 +1922,14 @@ def __init__(self): empty_provider = EmptyProvider() -class ZipManifests(dict): +class ZipManifests(Dict[str, "MemoizedZipManifests.manifest_mod"]): """ zip manifest builder """ + # `path` could be `StrPath | IO[bytes]` but that violates the LSP for `MemoizedZipManifests.load` @classmethod - def build(cls, path): + def build(cls, path: str): """ Build a dictionary similar to the zipimport directory caches, except instead of tuples, store ZipInfo objects. @@ -1762,9 +1955,11 @@ class MemoizedZipManifests(ZipManifests): Memoized zipfile manifests. """ - manifest_mod = collections.namedtuple('manifest_mod', 'manifest mtime') + class manifest_mod(NamedTuple): + manifest: dict[str, zipfile.ZipInfo] + mtime: float - def load(self, path): + def load(self, path: str) -> dict[str, zipfile.ZipInfo]: # type: ignore[override] # ZipManifests.load is a classmethod """ Load a manifest at path or return a suitable manifest already loaded. """ @@ -1781,10 +1976,12 @@ def load(self, path): class ZipProvider(EggProvider): """Resource support for zips and eggs""" - eagers = None + eagers: list[str] | None = None _zip_manifests = MemoizedZipManifests() + # ZipProvider's loader should always be a zipimporter or equivalent + loader: zipimport.zipimporter - def __init__(self, module): + def __init__(self, module: _ZipLoaderModule): super().__init__(module) self.zip_pre = self.loader.archive + os.sep @@ -1810,7 +2007,7 @@ def _parts(self, zip_path): def zipinfo(self): return self._zip_manifests.load(self.loader.archive) - def get_resource_filename(self, manager, resource_name): + def get_resource_filename(self, manager: ResourceManager, resource_name: str): if not self.egg_name: raise NotImplementedError( "resource_filename() only supported for .egg, not .zip" @@ -1833,7 +2030,7 @@ def _get_date_and_size(zip_stat): return timestamp, size # FIXME: 'ZipProvider._extract_resource' is too complex (12) - def _extract_resource(self, manager, zip_path): # noqa: C901 + def _extract_resource(self, manager: ResourceManager, zip_path) -> str: # noqa: C901 if zip_path in self._index(): for name in self._index()[zip_path]: last = self._extract_resource(manager, os.path.join(zip_path, name)) @@ -1844,9 +2041,13 @@ def _extract_resource(self, manager, zip_path): # noqa: C901 if not WRITE_SUPPORT: raise OSError( - '"os.rename" and "os.unlink" are not supported ' 'on this platform' + '"os.rename" and "os.unlink" are not supported on this platform' ) try: + if not self.egg_name: + raise OSError( + '"egg_name" is empty. This likely means no egg could be found from the "module_path".' + ) real_path = manager.get_cache_path(self.egg_name, self._parts(zip_path)) if self._is_current(real_path, zip_path): @@ -1935,10 +2136,10 @@ def _isdir(self, fspath) -> bool: def _listdir(self, fspath): return list(self._index().get(self._zipinfo_name(fspath), ())) - def _eager_to_zip(self, resource_name): + def _eager_to_zip(self, resource_name: str): return self._zipinfo_name(self._fn(self.egg_root, resource_name)) - def _resource_to_zip(self, resource_name): + def _resource_to_zip(self, resource_name: str): return self._zipinfo_name(self._fn(self.module_path, resource_name)) @@ -1957,16 +2158,16 @@ class FileMetadata(EmptyProvider): the provided location. """ - def __init__(self, path): + def __init__(self, path: StrPath): self.path = path def _get_metadata_path(self, name): return self.path - def has_metadata(self, name) -> bool: + def has_metadata(self, name: str) -> bool: return name == 'PKG-INFO' and os.path.isfile(self.path) - def get_metadata(self, name): + def get_metadata(self, name: str): if name != 'PKG-INFO': raise KeyError("No metadata except PKG-INFO is available") @@ -1982,7 +2183,7 @@ def _warn_on_replacement(self, metadata): msg = tmpl.format(**locals()) warnings.warn(msg) - def get_metadata_lines(self, name): + def get_metadata_lines(self, name: str) -> Iterator[str]: return yield_lines(self.get_metadata(name)) @@ -2006,7 +2207,7 @@ class PathMetadata(DefaultProvider): dist = Distribution.from_filename(egg_path, metadata=metadata) """ - def __init__(self, path, egg_info): + def __init__(self, path: str, egg_info: str): self.module_path = path self.egg_info = egg_info @@ -2014,7 +2215,7 @@ def __init__(self, path, egg_info): class EggMetadata(ZipProvider): """Metadata provider for .egg files""" - def __init__(self, importer): + def __init__(self, importer: zipimport.zipimporter): """Create a metadata provider from a zipimporter""" self.zip_pre = importer.archive + os.sep @@ -2026,10 +2227,12 @@ def __init__(self, importer): self._setup_prefix() -_declare_state('dict', _distribution_finders={}) +_distribution_finders: dict[type, _DistFinderType[Any]] = _declare_state( + 'dict', '_distribution_finders', {} +) -def register_finder(importer_type, distribution_finder): +def register_finder(importer_type: type[_T], distribution_finder: _DistFinderType[_T]): """Register `distribution_finder` to find distributions in sys.path items `importer_type` is the type or class of a PEP 302 "Importer" (sys.path item @@ -2039,14 +2242,16 @@ def register_finder(importer_type, distribution_finder): _distribution_finders[importer_type] = distribution_finder -def find_distributions(path_item, only=False): +def find_distributions(path_item: str, only: bool = False): """Yield distributions accessible via `path_item`""" importer = get_importer(path_item) finder = _find_adapter(_distribution_finders, importer) return finder(importer, path_item, only) -def find_eggs_in_zip(importer, path_item, only=False): +def find_eggs_in_zip( + importer: zipimport.zipimporter, path_item: str, only: bool = False +) -> Iterator[Distribution]: """ Find eggs in zip files; possibly multiple nested eggs. """ @@ -2075,14 +2280,16 @@ def find_eggs_in_zip(importer, path_item, only=False): register_finder(zipimport.zipimporter, find_eggs_in_zip) -def find_nothing(importer, path_item, only=False): +def find_nothing( + importer: object | None, path_item: str | None, only: bool | None = False +): return () register_finder(object, find_nothing) -def find_on_path(importer, path_item, only=False): +def find_on_path(importer: object | None, path_item, only=False): """Yield distributions accessible on a sys.path directory""" path_item = _normalize_cached(path_item) @@ -2137,7 +2344,7 @@ def __call__(self, fullpath): return iter(()) -def safe_listdir(path): +def safe_listdir(path: StrOrBytesPath): """ Attempt to list contents of path, but suppress some exceptions. """ @@ -2153,13 +2360,13 @@ def safe_listdir(path): return () -def distributions_from_metadata(path): +def distributions_from_metadata(path: str): root = os.path.dirname(path) if os.path.isdir(path): if len(os.listdir(path)) == 0: # empty metadata dir; skip return - metadata = PathMetadata(root, path) + metadata: _MetadataType = PathMetadata(root, path) else: metadata = FileMetadata(path) entry = os.path.basename(path) @@ -2175,11 +2382,10 @@ def non_empty_lines(path): """ Yield non-empty lines from file at path """ - with open(path) as f: - for line in f: - line = line.strip() - if line: - yield line + for line in _read_utf8_with_fallback(path).splitlines(): + line = line.strip() + if line: + yield line def resolve_egg_link(path): @@ -2200,11 +2406,17 @@ def resolve_egg_link(path): register_finder(importlib.machinery.FileFinder, find_on_path) -_declare_state('dict', _namespace_handlers={}) -_declare_state('dict', _namespace_packages={}) +_namespace_handlers: dict[type, _NSHandlerType[Any]] = _declare_state( + 'dict', '_namespace_handlers', {} +) +_namespace_packages: dict[str | None, list[str]] = _declare_state( + 'dict', '_namespace_packages', {} +) -def register_namespace_handler(importer_type, namespace_handler): +def register_namespace_handler( + importer_type: type[_T], namespace_handler: _NSHandlerType[_T] +): """Register `namespace_handler` to declare namespace packages `importer_type` is the type or class of a PEP 302 "Importer" (sys.path item @@ -2259,7 +2471,7 @@ def _handle_ns(packageName, path_item): return subpath -def _rebuild_mod_path(orig_path, package_name, module): +def _rebuild_mod_path(orig_path, package_name, module: types.ModuleType): """ Rebuild module.__path__ ensuring that all entries are ordered corresponding to their sys.path order @@ -2293,7 +2505,7 @@ def position_in_sys_path(path): module.__path__ = new_path -def declare_namespace(packageName): +def declare_namespace(packageName: str): """Declare that package 'packageName' is a namespace package""" msg = ( @@ -2310,7 +2522,7 @@ def declare_namespace(packageName): if packageName in _namespace_packages: return - path = sys.path + path: MutableSequence[str] = sys.path parent, _, _ = packageName.rpartition('.') if parent: @@ -2336,7 +2548,7 @@ def declare_namespace(packageName): _imp.release_lock() -def fixup_namespace_packages(path_item, parent=None): +def fixup_namespace_packages(path_item: str, parent: str | None = None): """Ensure that previously-declared namespace packages include path_item""" _imp.acquire_lock() try: @@ -2348,7 +2560,12 @@ def fixup_namespace_packages(path_item, parent=None): _imp.release_lock() -def file_ns_handler(importer, path_item, packageName, module): +def file_ns_handler( + importer: object, + path_item: StrPath, + packageName: str, + module: types.ModuleType, +): """Compute an ns-package subpath for a filesystem or zipfile importer""" subpath = os.path.join(path_item, packageName.split('.')[-1]) @@ -2368,19 +2585,28 @@ def file_ns_handler(importer, path_item, packageName, module): register_namespace_handler(importlib.machinery.FileFinder, file_ns_handler) -def null_ns_handler(importer, path_item, packageName, module): +def null_ns_handler( + importer: object, + path_item: str | None, + packageName: str | None, + module: _ModuleLike | None, +): return None register_namespace_handler(object, null_ns_handler) -def normalize_path(filename): +@overload +def normalize_path(filename: StrPath) -> str: ... +@overload +def normalize_path(filename: BytesPath) -> bytes: ... +def normalize_path(filename: StrOrBytesPath): """Normalize a file/dir name for comparison purposes""" return os.path.normcase(os.path.realpath(os.path.normpath(_cygwin_patch(filename)))) -def _cygwin_patch(filename): # pragma: nocover +def _cygwin_patch(filename: StrOrBytesPath): # pragma: nocover """ Contrary to POSIX 2008, on Cygwin, getcwd (3) contains symlink components. Using @@ -2391,9 +2617,19 @@ def _cygwin_patch(filename): # pragma: nocover return os.path.abspath(filename) if sys.platform == 'cygwin' else filename -@functools.lru_cache(maxsize=None) -def _normalize_cached(filename): - return normalize_path(filename) +if TYPE_CHECKING: + # https://github.com/python/mypy/issues/16261 + # https://github.com/python/typeshed/issues/6347 + @overload + def _normalize_cached(filename: StrPath) -> str: ... + @overload + def _normalize_cached(filename: BytesPath) -> bytes: ... + def _normalize_cached(filename: StrOrBytesPath) -> str | bytes: ... +else: + + @functools.lru_cache(maxsize=None) + def _normalize_cached(filename): + return normalize_path(filename) def _is_egg_path(path): @@ -2446,7 +2682,14 @@ def _set_parent_ns(packageName): class EntryPoint: """Object representing an advertised importable object""" - def __init__(self, name, module_name, attrs=(), extras=(), dist=None): + def __init__( + self, + name: str, + module_name: str, + attrs: Iterable[str] = (), + extras: Iterable[str] = (), + dist: Distribution | None = None, + ): if not MODULE(module_name): raise ValueError("Invalid module name", module_name) self.name = name @@ -2466,7 +2709,26 @@ def __str__(self): def __repr__(self): return "EntryPoint.parse(%r)" % str(self) - def load(self, require=True, *args, **kwargs): + @overload + def load( + self, + require: Literal[True] = True, + env: Environment | None = None, + installer: _InstallerType | None = None, + ) -> _ResolvedEntryPoint: ... + @overload + def load( + self, + require: Literal[False], + *args: Any, + **kwargs: Any, + ) -> _ResolvedEntryPoint: ... + def load( + self, + require: bool = True, + *args: Environment | _InstallerType | None, + **kwargs: Environment | _InstallerType | None, + ) -> _ResolvedEntryPoint: """ Require packages for this EntryPoint, then resolve it. """ @@ -2478,10 +2740,12 @@ def load(self, require=True, *args, **kwargs): stacklevel=2, ) if require: - self.require(*args, **kwargs) + # We could pass `env` and `installer` directly, + # but keeping `*args` and `**kwargs` for backwards compatibility + self.require(*args, **kwargs) # type: ignore return self.resolve() - def resolve(self): + def resolve(self) -> _ResolvedEntryPoint: """ Resolve the entry point from its module and attrs. """ @@ -2491,9 +2755,14 @@ def resolve(self): except AttributeError as exc: raise ImportError(str(exc)) from exc - def require(self, env=None, installer=None): - if self.extras and not self.dist: - raise UnknownExtra("Can't require() without a distribution", self) + def require( + self, + env: Environment | None = None, + installer: _InstallerType | None = None, + ): + if not self.dist: + error_cls = UnknownExtra if self.extras else AttributeError + raise error_cls("Can't require() without a distribution", self) # Get the requirements for this entry point with all its extras and # then resolve them. We have to pass `extras` along when resolving so @@ -2514,7 +2783,7 @@ def require(self, env=None, installer=None): ) @classmethod - def parse(cls, src, dist=None): + def parse(cls, src: str, dist: Distribution | None = None): """Parse a single entry point from string `src` Entry point syntax follows the form:: @@ -2539,15 +2808,20 @@ def _parse_extras(cls, extras_spec): return () req = Requirement.parse('x' + extras_spec) if req.specs: - raise ValueError() + raise ValueError return req.extras @classmethod - def parse_group(cls, group, lines, dist=None): + def parse_group( + cls, + group: str, + lines: _NestedStr, + dist: Distribution | None = None, + ): """Parse an entry point group""" if not MODULE(group): raise ValueError("Invalid group name", group) - this = {} + this: dict[str, Self] = {} for line in yield_lines(lines): ep = cls.parse(line, dist) if ep.name in this: @@ -2556,14 +2830,19 @@ def parse_group(cls, group, lines, dist=None): return this @classmethod - def parse_map(cls, data, dist=None): + def parse_map( + cls, + data: str | Iterable[str] | dict[str, str | Iterable[str]], + dist: Distribution | None = None, + ): """Parse a map of entry point groups""" + _data: Iterable[tuple[str | None, str | Iterable[str]]] if isinstance(data, dict): - data = data.items() + _data = data.items() else: - data = split_sections(data) - maps = {} - for group, lines in data: + _data = split_sections(data) + maps: dict[str, dict[str, Self]] = {} + for group, lines in _data: if group is None: if not lines: continue @@ -2597,13 +2876,13 @@ class Distribution: def __init__( self, - location=None, - metadata=None, - project_name=None, - version=None, - py_version=PY_MAJOR, - platform=None, - precedence=EGG_DIST, + location: str | None = None, + metadata: _MetadataType = None, + project_name: str | None = None, + version: str | None = None, + py_version: str | None = PY_MAJOR, + platform: str | None = None, + precedence: int = EGG_DIST, ): self.project_name = safe_name(project_name or 'Unknown') if version is not None: @@ -2615,7 +2894,13 @@ def __init__( self._provider = metadata or empty_provider @classmethod - def from_location(cls, location, basename, metadata=None, **kw): + def from_location( + cls, + location: str, + basename: StrPath, + metadata: _MetadataType = None, + **kw: int, # We could set `precedence` explicitly, but keeping this as `**kw` for full backwards and subclassing compatibility + ) -> Distribution: project_name, version, py_version, platform = [None] * 4 basename, ext = os.path.splitext(basename) if ext.lower() in _distributionImpl: @@ -2653,25 +2938,25 @@ def hashcmp(self): def __hash__(self): return hash(self.hashcmp) - def __lt__(self, other): + def __lt__(self, other: Distribution): return self.hashcmp < other.hashcmp - def __le__(self, other): + def __le__(self, other: Distribution): return self.hashcmp <= other.hashcmp - def __gt__(self, other): + def __gt__(self, other: Distribution): return self.hashcmp > other.hashcmp - def __ge__(self, other): + def __ge__(self, other: Distribution): return self.hashcmp >= other.hashcmp - def __eq__(self, other): + def __eq__(self, other: object): if not isinstance(other, self.__class__): # It's not a Distribution, so they are not equal return False return self.hashcmp == other.hashcmp - def __ne__(self, other): + def __ne__(self, other: object): return not self == other # These properties have to be lazy so that we don't have to load any @@ -2691,12 +2976,12 @@ def parsed_version(self): if not hasattr(self, "_parsed_version"): try: self._parsed_version = parse_version(self.version) - except packaging.version.InvalidVersion as ex: + except _packaging_version.InvalidVersion as ex: info = f"(package: {self.project_name})" if hasattr(ex, "add_note"): ex.add_note(info) # PEP 678 raise - raise packaging.version.InvalidVersion(f"{str(ex)} {info}") from None + raise _packaging_version.InvalidVersion(f"{str(ex)} {info}") from None return self._parsed_version @@ -2704,7 +2989,7 @@ def parsed_version(self): def _forgiving_parsed_version(self): try: return self.parsed_version - except packaging.version.InvalidVersion as ex: + except _packaging_version.InvalidVersion as ex: self._parsed_version = parse_version(_forgiving_version(self.version)) notes = "\n".join(getattr(ex, "__notes__", [])) # PEP 678 @@ -2754,14 +3039,14 @@ def _dep_map(self): return self.__dep_map @staticmethod - def _filter_extras(dm): + def _filter_extras(dm: dict[str | None, list[Requirement]]): """ Given a mapping of extras to dependencies, strip off environment markers and filter out any dependencies not matching the markers. """ for extra in list(filter(None, dm)): - new_extra = extra + new_extra: str | None = extra reqs = dm.pop(extra) new_extra, _, marker = extra.partition(':') fails_marker = marker and ( @@ -2781,10 +3066,10 @@ def _build_dep_map(self): dm.setdefault(extra, []).extend(parse_requirements(reqs)) return dm - def requires(self, extras=()): + def requires(self, extras: Iterable[str] = ()): """List of Requirements needed for this distro if `extras` are used""" dm = self._dep_map - deps = [] + deps: list[Requirement] = [] deps.extend(dm.get(None, ())) for ext in extras: try: @@ -2820,12 +3105,12 @@ def _get_version(self): lines = self._get_metadata(self.PKG_INFO) return _version_from_file(lines) - def activate(self, path=None, replace=False): + def activate(self, path: list[str] | None = None, replace: bool = False): """Ensure distribution is importable on `path` (default=sys.path)""" if path is None: path = sys.path self.insert_on(path, replace=replace) - if path is sys.path: + if path is sys.path and self.location is not None: fixup_namespace_packages(self.location) for pkg in self._get_metadata('namespace_packages.txt'): if pkg in sys.modules: @@ -2870,45 +3155,57 @@ def __dir__(self): ) @classmethod - def from_filename(cls, filename, metadata=None, **kw): + def from_filename( + cls, + filename: StrPath, + metadata: _MetadataType = None, + **kw: int, # We could set `precedence` explicitly, but keeping this as `**kw` for full backwards and subclassing compatibility + ): return cls.from_location( _normalize_cached(filename), os.path.basename(filename), metadata, **kw ) def as_requirement(self): """Return a ``Requirement`` that matches this distribution exactly""" - if isinstance(self.parsed_version, packaging.version.Version): + if isinstance(self.parsed_version, _packaging_version.Version): spec = "%s==%s" % (self.project_name, self.parsed_version) else: spec = "%s===%s" % (self.project_name, self.parsed_version) return Requirement.parse(spec) - def load_entry_point(self, group, name): + def load_entry_point(self, group: str, name: str) -> _ResolvedEntryPoint: """Return the `name` entry point of `group` or raise ImportError""" ep = self.get_entry_info(group, name) if ep is None: raise ImportError("Entry point %r not found" % ((group, name),)) return ep.load() - def get_entry_map(self, group=None): + @overload + def get_entry_map(self, group: None = None) -> dict[str, dict[str, EntryPoint]]: ... + @overload + def get_entry_map(self, group: str) -> dict[str, EntryPoint]: ... + def get_entry_map(self, group: str | None = None): """Return the entry point map for `group`, or the full entry map""" - try: - ep_map = self._ep_map - except AttributeError: - ep_map = self._ep_map = EntryPoint.parse_map( + if not hasattr(self, "_ep_map"): + self._ep_map = EntryPoint.parse_map( self._get_metadata('entry_points.txt'), self ) if group is not None: - return ep_map.get(group, {}) - return ep_map + return self._ep_map.get(group, {}) + return self._ep_map - def get_entry_info(self, group, name): + def get_entry_info(self, group: str, name: str): """Return the EntryPoint object for `group`+`name`, or ``None``""" return self.get_entry_map(group).get(name) # FIXME: 'Distribution.insert_on' is too complex (13) - def insert_on(self, path, loc=None, replace=False): # noqa: C901 + def insert_on( # noqa: C901 + self, + path: list[str], + loc=None, + replace: bool = False, + ): """Ensure self.location is on path If replace=False (default): @@ -3013,13 +3310,14 @@ def has_version(self): return False return True - def clone(self, **kw): + def clone(self, **kw: str | int | IResourceProvider | None): """Copy this distribution, substituting in any changed keyword args""" names = 'project_name version py_version platform location precedence' for attr in names.split(): kw.setdefault(attr, getattr(self, attr, None)) kw.setdefault('metadata', self._provider) - return self.__class__(**kw) + # Unsafely unpacking. But keeping **kw for backwards and subclassing compatibility + return self.__class__(**kw) # type:ignore[arg-type] @property def extras(self): @@ -3072,11 +3370,11 @@ def _dep_map(self): self.__dep_map = self._compute_dependencies() return self.__dep_map - def _compute_dependencies(self): + def _compute_dependencies(self) -> dict[str | None, list[Requirement]]: """Recompute this distribution's dependencies.""" - dm = self.__dep_map = {None: []} + self.__dep_map: dict[str | None, list[Requirement]] = {None: []} - reqs = [] + reqs: list[Requirement] = [] # Including any condition expressions for req in self._parsed_pkg_info.get_all('Requires-Dist') or []: reqs.extend(parse_requirements(req)) @@ -3087,13 +3385,15 @@ def reqs_for_extra(extra): yield req common = types.MappingProxyType(dict.fromkeys(reqs_for_extra(None))) - dm[None].extend(common) + self.__dep_map[None].extend(common) for extra in self._parsed_pkg_info.get_all('Provides-Extra') or []: s_extra = safe_extra(extra.strip()) - dm[s_extra] = [r for r in reqs_for_extra(extra) if r not in common] + self.__dep_map[s_extra] = [ + r for r in reqs_for_extra(extra) if r not in common + ] - return dm + return self.__dep_map _distributionImpl = { @@ -3116,7 +3416,7 @@ def issue_warning(*args, **kw): warnings.warn(stacklevel=level + 1, *args, **kw) -def parse_requirements(strs): +def parse_requirements(strs: _NestedStr): """ Yield ``Requirement`` objects for each specification in `strs`. @@ -3125,19 +3425,20 @@ def parse_requirements(strs): return map(Requirement, join_continuation(map(drop_comment, yield_lines(strs)))) -class RequirementParseError(packaging.requirements.InvalidRequirement): +class RequirementParseError(_packaging_requirements.InvalidRequirement): "Compatibility wrapper for InvalidRequirement" -class Requirement(packaging.requirements.Requirement): - def __init__(self, requirement_string): +class Requirement(_packaging_requirements.Requirement): + def __init__(self, requirement_string: str): """DO NOT CALL THIS UNDOCUMENTED METHOD; use Requirement.parse()!""" super().__init__(requirement_string) self.unsafe_name = self.name project_name = safe_name(self.name) self.project_name, self.key = project_name, project_name.lower() self.specs = [(spec.operator, spec.version) for spec in self.specifier] - self.extras = tuple(map(safe_extra, self.extras)) + # packaging.requirements.Requirement uses a set for its extras. We use a variable-length tuple + self.extras: tuple[str] = tuple(map(safe_extra, self.extras)) self.hashCmp = ( self.key, self.url, @@ -3147,13 +3448,13 @@ def __init__(self, requirement_string): ) self.__hash = hash(self.hashCmp) - def __eq__(self, other): + def __eq__(self, other: object): return isinstance(other, Requirement) and self.hashCmp == other.hashCmp def __ne__(self, other): return not self == other - def __contains__(self, item): + def __contains__(self, item: Distribution | str | tuple[str, ...]) -> bool: if isinstance(item, Distribution): if item.key != self.key: return False @@ -3172,7 +3473,7 @@ def __repr__(self): return "Requirement.parse(%r)" % str(self) @staticmethod - def parse(s): + def parse(s: str | Iterable[str]): (req,) = parse_requirements(s) return req @@ -3187,7 +3488,7 @@ def _always_object(classes): return classes -def _find_adapter(registry, ob): +def _find_adapter(registry: Mapping[type, _AdapterT], ob: object) -> _AdapterT: """Return an adapter factory for `ob` from `registry`""" types = _always_object(inspect.getmro(getattr(ob, '__class__', type(ob)))) for t in types: @@ -3198,7 +3499,7 @@ def _find_adapter(registry, ob): raise TypeError(f"Could not find adapter for {registry} and {ob}") -def ensure_directory(path): +def ensure_directory(path: StrOrBytesPath): """Ensure that the parent directory of `path` exists""" dirname = os.path.dirname(path) os.makedirs(dirname, exist_ok=True) @@ -3217,7 +3518,7 @@ def _bypass_ensure_directory(path): pass -def split_sections(s): +def split_sections(s: _NestedStr) -> Iterator[tuple[str | None, list[str]]]: """Split a string or iterable thereof into (section, content) pairs Each ``section`` is a stripped version of the section header ("[section]") @@ -3261,6 +3562,47 @@ def _mkstemp(*args, **kw): warnings.filterwarnings("ignore", category=PEP440Warning, append=True) +class PkgResourcesDeprecationWarning(Warning): + """ + Base class for warning about deprecations in ``pkg_resources`` + + This class is not derived from ``DeprecationWarning``, and as such is + visible by default. + """ + + +# Ported from ``setuptools`` to avoid introducing an import inter-dependency: +_LOCALE_ENCODING = "locale" if sys.version_info >= (3, 10) else None + + +def _read_utf8_with_fallback(file: str, fallback_encoding=_LOCALE_ENCODING) -> str: + """See setuptools.unicode_utils._read_utf8_with_fallback""" + try: + with open(file, "r", encoding="utf-8") as f: + return f.read() + except UnicodeDecodeError: # pragma: no cover + msg = f"""\ + ******************************************************************************** + `encoding="utf-8"` fails with {file!r}, trying `encoding={fallback_encoding!r}`. + + This fallback behaviour is considered **deprecated** and future versions of + `setuptools/pkg_resources` may not implement it. + + Please encode {file!r} with "utf-8" to ensure future builds will succeed. + + If this file was produced by `setuptools` itself, cleaning up the cached files + and re-building/re-installing the package with a newer version of `setuptools` + (e.g. by updating `build-system.requires` in its `pyproject.toml`) + might solve the problem. + ******************************************************************************** + """ + # TODO: Add a deadline? + # See comment in setuptools.unicode_utils._Utf8EncodingNeeded + warnings.warn(msg, PkgResourcesDeprecationWarning, stacklevel=2) + with open(file, "r", encoding=fallback_encoding) as f: + return f.read() + + # from jaraco.functools 1.3 def _call_aside(f, *args, **kwargs): f(*args, **kwargs) @@ -3279,15 +3621,6 @@ def _initialize(g=globals()): ) -class PkgResourcesDeprecationWarning(Warning): - """ - Base class for warning about deprecations in ``pkg_resources`` - - This class is not derived from ``DeprecationWarning``, and as such is - visible by default. - """ - - @_call_aside def _initialize_master_working_set(): """ @@ -3301,8 +3634,7 @@ def _initialize_master_working_set(): Invocation by other packages is unsupported and done at their own risk. """ - working_set = WorkingSet._build_master() - _declare_state('object', working_set=working_set) + working_set = _declare_state('object', 'working_set', WorkingSet._build_master()) require = working_set.require iter_entry_points = working_set.iter_entry_points @@ -3323,3 +3655,23 @@ def _initialize_master_working_set(): # match order list(map(working_set.add_entry, sys.path)) globals().update(locals()) + + +if TYPE_CHECKING: + # All of these are set by the @_call_aside methods above + __resource_manager = ResourceManager() # Won't exist at runtime + resource_exists = __resource_manager.resource_exists + resource_isdir = __resource_manager.resource_isdir + resource_filename = __resource_manager.resource_filename + resource_stream = __resource_manager.resource_stream + resource_string = __resource_manager.resource_string + resource_listdir = __resource_manager.resource_listdir + set_extraction_path = __resource_manager.set_extraction_path + cleanup_resources = __resource_manager.cleanup_resources + + working_set = WorkingSet() + require = working_set.require + iter_entry_points = working_set.iter_entry_points + add_activation_listener = working_set.subscribe + run_script = working_set.run_script + run_main = run_script diff --git a/src/pip/_vendor/vendor.txt b/src/pip/_vendor/vendor.txt index 49435b8c224..34c7875e765 100644 --- a/src/pip/_vendor/vendor.txt +++ b/src/pip/_vendor/vendor.txt @@ -13,7 +13,7 @@ rich==13.7.1 pygments==2.18.0 typing_extensions==4.12.2 resolvelib==1.0.1 -setuptools==69.5.1 +setuptools==70.2.0 tenacity==8.2.3 tomli==2.0.1 truststore==0.9.1 diff --git a/tools/vendoring/patches/pkg_resources.patch b/tools/vendoring/patches/pkg_resources.patch index a99b6c63df8..946c1eb9cea 100644 --- a/tools/vendoring/patches/pkg_resources.patch +++ b/tools/vendoring/patches/pkg_resources.patch @@ -1,11 +1,11 @@ diff --git a/src/pip/_vendor/pkg_resources/__init__.py b/src/pip/_vendor/pkg_resources/__init__.py -index 3f2476a0c..8d5727d35 100644 +index d47df3f3c..1ee0d2cf9 100644 --- a/src/pip/_vendor/pkg_resources/__init__.py +++ b/src/pip/_vendor/pkg_resources/__init__.py -@@ -65,7 +65,7 @@ - from os import open as os_open - from os.path import isdir, split - +@@ -87,7 +87,7 @@ except ImportError: + # no write support, probably under GAE + WRITE_SUPPORT = False + -from pkg_resources.extern.jaraco.text import ( +from pip._internal.utils._jaraco_text import ( yield_lines,