-
Notifications
You must be signed in to change notification settings - Fork 56
/
Copy pathwalker.py
290 lines (235 loc) · 10.4 KB
/
walker.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
from __future__ import annotations
from typing import TYPE_CHECKING
from packaging.utils import canonicalize_name
from poetry.core.constraints.version.util import constraint_regions
from poetry.core.version.markers import AnyMarker
from poetry.core.version.markers import SingleMarker
from poetry.packages import DependencyPackage
from poetry.utils.extras import get_extra_package_names
if TYPE_CHECKING:
from collections.abc import Collection
from collections.abc import Iterable
from collections.abc import Iterator
from packaging.utils import NormalizedName
from poetry.core.packages.dependency import Dependency
from poetry.core.packages.package import Package
from poetry.core.version.markers import BaseMarker
from poetry.packages import Locker
def get_python_version_region_markers(packages: list[Package]) -> list[BaseMarker]:
markers = []
regions = constraint_regions([package.python_constraint for package in packages])
for region in regions:
marker: BaseMarker = AnyMarker()
if region.min is not None:
min_operator = ">=" if region.include_min else ">"
marker_name = (
"python_full_version" if region.min.precision > 2 else "python_version"
)
lo = SingleMarker(marker_name, f"{min_operator} {region.min}")
marker = marker.intersect(lo)
if region.max is not None:
max_operator = "<=" if region.include_max else "<"
marker_name = (
"python_full_version" if region.max.precision > 2 else "python_version"
)
hi = SingleMarker(marker_name, f"{max_operator} {region.max}")
marker = marker.intersect(hi)
markers.append(marker)
return markers
def get_project_dependency_packages(
locker: Locker,
project_requires: list[Dependency],
root_package_name: NormalizedName,
project_python_marker: BaseMarker | None = None,
extras: Collection[NormalizedName] = (),
) -> Iterator[DependencyPackage]:
# Apply the project python marker to all requirements.
if project_python_marker is not None:
marked_requires: list[Dependency] = []
for require in project_requires:
require = require.clone()
require.marker = require.marker.intersect(project_python_marker)
marked_requires.append(require)
project_requires = marked_requires
repository = locker.locked_repository()
# Build a set of all packages required by our selected extras
locked_extras = {
canonicalize_name(extra): [
canonicalize_name(dependency) for dependency in dependencies
]
for extra, dependencies in locker.lock_data.get("extras", {}).items()
}
extra_package_names = get_extra_package_names(
repository.packages,
locked_extras,
extras,
)
# If a package is optional and we haven't opted in to it, do not select
selected = []
for dependency in project_requires:
try:
package = repository.find_packages(dependency=dependency)[0]
except IndexError:
continue
if package.optional and package.name not in extra_package_names:
# a package is locked as optional, but is not activated via extras
continue
selected.append(dependency)
for package, dependency in get_project_dependencies(
project_requires=selected,
locked_packages=repository.packages,
root_package_name=root_package_name,
):
yield DependencyPackage(dependency=dependency, package=package)
def get_project_dependencies(
project_requires: list[Dependency],
locked_packages: list[Package],
root_package_name: NormalizedName,
) -> Iterable[tuple[Package, Dependency]]:
# group packages entries by name, this is required because requirement might use
# different constraints.
packages_by_name: dict[str, list[Package]] = {}
for pkg in locked_packages:
if pkg.name not in packages_by_name:
packages_by_name[pkg.name] = []
packages_by_name[pkg.name].append(pkg)
# Put higher versions first so that we prefer them.
for packages in packages_by_name.values():
packages.sort(
key=lambda package: package.version,
reverse=True,
)
nested_dependencies = walk_dependencies(
dependencies=project_requires,
packages_by_name=packages_by_name,
root_package_name=root_package_name,
)
return nested_dependencies.items()
def walk_dependencies(
dependencies: list[Dependency],
packages_by_name: dict[str, list[Package]],
root_package_name: NormalizedName,
) -> dict[Package, Dependency]:
nested_dependencies: dict[Package, Dependency] = {}
visited: set[tuple[Dependency, BaseMarker]] = set()
while dependencies:
requirement = dependencies.pop(0)
if (requirement, requirement.marker) in visited:
continue
if requirement.name == root_package_name:
continue
visited.add((requirement, requirement.marker))
locked_package = get_locked_package(
requirement, packages_by_name, nested_dependencies
)
if not locked_package:
raise RuntimeError(f"Dependency walk failed at {requirement}")
if requirement.extras:
locked_package = locked_package.with_features(requirement.extras)
# create dependency from locked package to retain dependency metadata
# if this is not done, we can end-up with incorrect nested dependencies
constraint = requirement.constraint
marker = requirement.marker
requirement = locked_package.to_dependency()
requirement.marker = requirement.marker.intersect(marker)
requirement.constraint = constraint
for require in locked_package.requires:
if require.is_optional() and not any(
require in locked_package.extras.get(feature, ())
for feature in locked_package.features
):
continue
base_marker = require.marker.intersect(requirement.marker).without_extras()
if not base_marker.is_empty():
# So as to give ourselves enough flexibility in choosing a solution,
# we need to split the world up into the python version ranges that
# this package might care about.
#
# We create a marker for all of the possible regions, and add a
# requirement for each separately.
candidates = packages_by_name.get(require.name, [])
region_markers = get_python_version_region_markers(candidates)
for region_marker in region_markers:
marker = region_marker.intersect(base_marker)
if not marker.is_empty():
require2 = require.clone()
require2.marker = marker
dependencies.append(require2)
key = locked_package
if key not in nested_dependencies:
nested_dependencies[key] = requirement
else:
nested_dependencies[key].marker = nested_dependencies[key].marker.union(
requirement.marker
)
return nested_dependencies
def get_locked_package(
dependency: Dependency,
packages_by_name: dict[str, list[Package]],
decided: dict[Package, Dependency] | None = None,
) -> Package | None:
"""
Internal helper to identify corresponding locked package using dependency
version constraints.
"""
decided = decided or {}
candidates = packages_by_name.get(dependency.name, [])
# If we've previously chosen a version of this package that is compatible with
# the current requirement, we are forced to stick with it. (Else we end up with
# different versions of the same package at the same time.)
overlapping_candidates = set()
for package in candidates:
old_decision = decided.get(package)
if (
old_decision is not None
and not old_decision.marker.intersect(dependency.marker).is_empty()
):
overlapping_candidates.add(package)
# If we have more than one overlapping candidate, we've run into trouble.
if len(overlapping_candidates) > 1:
return None
# Get the packages that are consistent with this dependency.
compatible_candidates = [
package
for package in candidates
if package.python_constraint.allows_all(dependency.python_constraint)
and dependency.constraint.allows(package.version)
and (dependency.source_type is None or dependency.is_same_source_as(package))
]
# If we have an overlapping candidate, we must use it.
if overlapping_candidates:
filtered_compatible_candidates = [
package
for package in compatible_candidates
if package in overlapping_candidates
]
if not filtered_compatible_candidates:
# TODO: Support this case:
# https://github.com/python-poetry/poetry-plugin-export/issues/183
raise DependencyWalkerError(
f"The `{dependency.name}` package has the following compatible"
f" candidates `{compatible_candidates}`; but, the exporter dependency"
f" walker previously elected `{overlapping_candidates.pop()}` which is"
f" not compatible with the dependency `{dependency}`. Please contribute"
" to `poetry-plugin-export` to solve this problem."
)
compatible_candidates = filtered_compatible_candidates
return next(iter(compatible_candidates), None)
def get_project_dependency_packages2(
locker: Locker,
project_python_marker: BaseMarker | None = None,
groups: Collection[str] = (),
extras: Collection[NormalizedName] = (),
) -> Iterator[DependencyPackage]:
for package, info in locker.locked_packages().items():
if not info.groups.intersection(groups):
continue
marker = info.get_marker(groups)
if not marker.validate({"extra": extras}):
continue
if project_python_marker:
marker = project_python_marker.intersect(marker)
package.marker = marker
yield DependencyPackage(dependency=package.to_dependency(), package=package)
class DependencyWalkerError(Exception):
pass