Skip to content

Commit

Permalink
fix VCS environment variable expansion
Browse files Browse the repository at this point in the history
  • Loading branch information
matteius authored and oz123 committed Oct 22, 2024
1 parent 95ab685 commit b2c094b
Show file tree
Hide file tree
Showing 3 changed files with 438 additions and 36 deletions.
153 changes: 117 additions & 36 deletions pipenv/utils/dependencies.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,10 @@
from functools import lru_cache
from pathlib import Path
from tempfile import NamedTemporaryFile, TemporaryDirectory
from typing import Any, AnyStr, Dict, List, Mapping, Optional, Sequence, Union
from typing import Any, AnyStr, Dict, List, Mapping, Optional, Sequence, Tuple, Union
from urllib.parse import urlparse, urlsplit, urlunparse, urlunsplit

from pipenv.exceptions import PipenvUsageError
from pipenv.patched.pip._internal.models.link import Link
from pipenv.patched.pip._internal.network.download import Downloader
from pipenv.patched.pip._internal.req.constructors import (
Expand Down Expand Up @@ -1083,13 +1084,72 @@ def normalize_vcs_url(vcs_url):
return vcs_url, vcs_ref


def install_req_from_pipfile(name, pipfile):
"""Creates an InstallRequirement from a name and a pipfile entry.
Handles VCS, local & remote paths, and regular named requirements.
"file" and "path" entries are treated the same.
class VCSURLProcessor:
"""Handles processing and environment variable expansion in VCS URLs."""

ENV_VAR_PATTERN = re.compile(r"\${([^}]+)}|\$([a-zA-Z_][a-zA-Z0-9_]*)")

@classmethod
def expand_env_vars(cls, value: str) -> str:
"""
Expands environment variables in a string, with detailed error handling.
Supports both ${VAR} and $VAR syntax.
"""

def _replace_var(match):
var_name = match.group(1) or match.group(2)
if var_name not in os.environ:
raise PipenvUsageError(
f"Environment variable '${var_name}' not found. "
"Please ensure all required environment variables are set."
)
return os.environ[var_name]

try:
return cls.ENV_VAR_PATTERN.sub(_replace_var, value)
except Exception as e:
raise PipenvUsageError(f"Error expanding environment variables: {str(e)}")

@classmethod
def process_vcs_url(cls, url: str) -> str:
"""
Processes a VCS URL, expanding environment variables in individual components.
Handles URLs of the form: vcs+protocol://username:password@hostname/path
"""
parsed = urlparse(url)

# Process each component separately
netloc_parts = parsed.netloc.split("@")
if len(netloc_parts) > 1:
# Handle auth information
auth, host = netloc_parts
if ":" in auth:
username, password = auth.split(":")
username = cls.expand_env_vars(username)
password = cls.expand_env_vars(password)
auth = f"{username}:{password}"
else:
auth = cls.expand_env_vars(auth)
netloc = f"{auth}@{host}"
else:
netloc = cls.expand_env_vars(parsed.netloc)

# Reconstruct URL with processed components
processed_parts = list(parsed)
processed_parts[1] = netloc # Update netloc
processed_parts[2] = cls.expand_env_vars(parsed.path) # Update path

return urlunparse(tuple(processed_parts))


def install_req_from_pipfile(name: str, pipfile: Dict[str, Any]) -> Tuple[Any, Any, str]:
"""
Creates an InstallRequirement from a name and a pipfile entry.
Enhanced to handle environment variables within VCS URLs.
"""
_pipfile = {}
vcs = None

if hasattr(pipfile, "keys"):
_pipfile = dict(pipfile).copy()
else:
Expand All @@ -1098,43 +1158,41 @@ def install_req_from_pipfile(name, pipfile):
_pipfile[vcs] = pipfile

extras = _pipfile.get("extras", [])
extras_str = ""
if extras:
extras_str = f"[{','.join(extras)}]"
extras_str = f"[{','.join(extras)}]" if extras else ""

if not vcs:
vcs = next(iter([vcs for vcs in VCS_LIST if vcs in _pipfile]), None)

if vcs:
vcs_url = _pipfile[vcs]
subdirectory = _pipfile.get("subdirectory", "")
if subdirectory:
subdirectory = f"#subdirectory={subdirectory}"
vcs_url, fallback_ref = normalize_vcs_url(vcs_url)
req_str = f"{vcs_url}@{_pipfile.get('ref', fallback_ref)}{extras_str}"
if not req_str.startswith(f"{vcs}+"):
req_str = f"{vcs}+{req_str}"
if _pipfile.get("editable", False):
req_str = f"-e {name}{extras_str} @ {req_str}{subdirectory}"
else:
req_str = f"{name}{extras_str} @ {req_str}{subdirectory}"
elif "path" in _pipfile:
req_str = file_path_from_pipfile(_pipfile["path"], _pipfile)
elif "file" in _pipfile:
req_str = file_path_from_pipfile(_pipfile["file"], _pipfile)
else:
# We ensure version contains an operator. Default to equals (==)
_pipfile["version"] = version = get_version(pipfile)
if version and not is_star(version) and COMPARE_OP.match(version) is None:
_pipfile["version"] = f"=={version}"
if is_star(version) or version == "==*":
version = ""
req_str = f"{name}{extras_str}{version}"
try:
vcs_url = _pipfile[vcs]
subdirectory = _pipfile.get("subdirectory", "")
if subdirectory:
subdirectory = f"#subdirectory={subdirectory}"

# Process VCS URL with environment variable handling
vcs_url, fallback_ref = normalize_vcs_url(vcs_url)
ref = _pipfile.get("ref", fallback_ref)

# Construct requirement string
req_str = f"{vcs_url}@{ref}{extras_str}"
if not req_str.startswith(f"{vcs}+"):
req_str = f"{vcs}+{req_str}"

if _pipfile.get("editable", False):
req_str = f"-e {name}{extras_str} @ {req_str}{subdirectory}"
else:
req_str = f"{name}{extras_str} @ {req_str}{subdirectory}"

# Handle markers before constructing InstallRequirement
markers = PipenvMarkers.from_pipfile(name, _pipfile)
if markers:
req_str = f"{req_str};{markers}"
except PipenvUsageError as e:
raise PipenvUsageError(
f"Error processing VCS URL for requirement '{name}': {str(e)}"
)
else:
# Handle non-VCS requirements (unchanged)
req_str = handle_non_vcs_requirement(name, _pipfile, extras_str)

# Create InstallRequirement
install_req, _ = expansive_install_req_from_line(
req_str,
comes_from=None,
Expand All @@ -1144,10 +1202,33 @@ def install_req_from_pipfile(name, pipfile):
constraint=False,
expand_env=True,
)

markers = PipenvMarkers.from_pipfile(name, _pipfile)
return install_req, markers, req_str


def handle_non_vcs_requirement(
name: str, _pipfile: Dict[str, Any], extras_str: str
) -> str:
"""Helper function to handle non-VCS requirements."""
if "path" in _pipfile:
return file_path_from_pipfile(_pipfile["path"], _pipfile)
elif "file" in _pipfile:
return file_path_from_pipfile(_pipfile["file"], _pipfile)
else:
version = get_version(_pipfile)
if version and not is_star(version) and COMPARE_OP.match(version) is None:
version = f"=={version}"
if is_star(version) or version == "==*":
version = ""

req_str = f"{name}{extras_str}{version}"
markers = PipenvMarkers.from_pipfile(name, _pipfile)
if markers:
req_str = f"{req_str};{markers}"
return req_str


def from_pipfile(name, pipfile):
install_req, markers, req_str = install_req_from_pipfile(name, pipfile)
if markers:
Expand Down
4 changes: 4 additions & 0 deletions tests/integration/test_install_vcs.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
import os
from unittest.mock import patch, Mock, MagicMock

import pytest

from pipenv.patched.pip._internal.vcs.git import Git
from pipenv.utils import requirementslib


@pytest.mark.basic
@pytest.mark.install
Expand Down
Loading

0 comments on commit b2c094b

Please sign in to comment.