first commit

This commit is contained in:
Victor
2023-11-16 16:57:13 +01:00
commit a8fb1fd811
1477 changed files with 275005 additions and 0 deletions

View File

@@ -0,0 +1,15 @@
# Expose a limited set of classes and functions so callers outside of
# the vcs package don't need to import deeper than `pip._internal.vcs`.
# (The test directory may still need to import from a vcs sub-package.)
# Import all vcs modules to register each VCS in the VcsSupport object.
import pip._internal.vcs.bazaar
import pip._internal.vcs.git
import pip._internal.vcs.mercurial
import pip._internal.vcs.subversion # noqa: F401
from pip._internal.vcs.versioncontrol import ( # noqa: F401
RemoteNotFoundError,
RemoteNotValidError,
is_url,
make_vcs_requirement_url,
vcs,
)

View File

@@ -0,0 +1,112 @@
import logging
from typing import List, Optional, Tuple
from pip._internal.utils.misc import HiddenText, display_path
from pip._internal.utils.subprocess import make_command
from pip._internal.utils.urls import path_to_url
from pip._internal.vcs.versioncontrol import (
AuthInfo,
RemoteNotFoundError,
RevOptions,
VersionControl,
vcs,
)
logger = logging.getLogger(__name__)
class Bazaar(VersionControl):
name = "bzr"
dirname = ".bzr"
repo_name = "branch"
schemes = (
"bzr+http",
"bzr+https",
"bzr+ssh",
"bzr+sftp",
"bzr+ftp",
"bzr+lp",
"bzr+file",
)
@staticmethod
def get_base_rev_args(rev: str) -> List[str]:
return ["-r", rev]
def fetch_new(
self, dest: str, url: HiddenText, rev_options: RevOptions, verbosity: int
) -> None:
rev_display = rev_options.to_display()
logger.info(
"Checking out %s%s to %s",
url,
rev_display,
display_path(dest),
)
if verbosity <= 0:
flag = "--quiet"
elif verbosity == 1:
flag = ""
else:
flag = f"-{'v'*verbosity}"
cmd_args = make_command(
"checkout", "--lightweight", flag, rev_options.to_args(), url, dest
)
self.run_command(cmd_args)
def switch(self, dest: str, url: HiddenText, rev_options: RevOptions) -> None:
self.run_command(make_command("switch", url), cwd=dest)
def update(self, dest: str, url: HiddenText, rev_options: RevOptions) -> None:
output = self.run_command(
make_command("info"), show_stdout=False, stdout_only=True, cwd=dest
)
if output.startswith("Standalone "):
# Older versions of pip used to create standalone branches.
# Convert the standalone branch to a checkout by calling "bzr bind".
cmd_args = make_command("bind", "-q", url)
self.run_command(cmd_args, cwd=dest)
cmd_args = make_command("update", "-q", rev_options.to_args())
self.run_command(cmd_args, cwd=dest)
@classmethod
def get_url_rev_and_auth(cls, url: str) -> Tuple[str, Optional[str], AuthInfo]:
# hotfix the URL scheme after removing bzr+ from bzr+ssh:// re-add it
url, rev, user_pass = super().get_url_rev_and_auth(url)
if url.startswith("ssh://"):
url = "bzr+" + url
return url, rev, user_pass
@classmethod
def get_remote_url(cls, location: str) -> str:
urls = cls.run_command(
["info"], show_stdout=False, stdout_only=True, cwd=location
)
for line in urls.splitlines():
line = line.strip()
for x in ("checkout of branch: ", "parent branch: "):
if line.startswith(x):
repo = line.split(x)[1]
if cls._is_local_repository(repo):
return path_to_url(repo)
return repo
raise RemoteNotFoundError
@classmethod
def get_revision(cls, location: str) -> str:
revision = cls.run_command(
["revno"],
show_stdout=False,
stdout_only=True,
cwd=location,
)
return revision.splitlines()[-1]
@classmethod
def is_commit_id_equal(cls, dest: str, name: Optional[str]) -> bool:
"""Always assume the versions don't match"""
return False
vcs.register(Bazaar)

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,163 @@
import configparser
import logging
import os
from typing import List, Optional, Tuple
from pip._internal.exceptions import BadCommand, InstallationError
from pip._internal.utils.misc import HiddenText, display_path
from pip._internal.utils.subprocess import make_command
from pip._internal.utils.urls import path_to_url
from pip._internal.vcs.versioncontrol import (
RevOptions,
VersionControl,
find_path_to_project_root_from_repo_root,
vcs,
)
logger = logging.getLogger(__name__)
class Mercurial(VersionControl):
name = "hg"
dirname = ".hg"
repo_name = "clone"
schemes = (
"hg+file",
"hg+http",
"hg+https",
"hg+ssh",
"hg+static-http",
)
@staticmethod
def get_base_rev_args(rev: str) -> List[str]:
return ["-r", rev]
def fetch_new(
self, dest: str, url: HiddenText, rev_options: RevOptions, verbosity: int
) -> None:
rev_display = rev_options.to_display()
logger.info(
"Cloning hg %s%s to %s",
url,
rev_display,
display_path(dest),
)
if verbosity <= 0:
flags: Tuple[str, ...] = ("--quiet",)
elif verbosity == 1:
flags = ()
elif verbosity == 2:
flags = ("--verbose",)
else:
flags = ("--verbose", "--debug")
self.run_command(make_command("clone", "--noupdate", *flags, url, dest))
self.run_command(
make_command("update", *flags, rev_options.to_args()),
cwd=dest,
)
def switch(self, dest: str, url: HiddenText, rev_options: RevOptions) -> None:
repo_config = os.path.join(dest, self.dirname, "hgrc")
config = configparser.RawConfigParser()
try:
config.read(repo_config)
config.set("paths", "default", url.secret)
with open(repo_config, "w") as config_file:
config.write(config_file)
except (OSError, configparser.NoSectionError) as exc:
logger.warning("Could not switch Mercurial repository to %s: %s", url, exc)
else:
cmd_args = make_command("update", "-q", rev_options.to_args())
self.run_command(cmd_args, cwd=dest)
def update(self, dest: str, url: HiddenText, rev_options: RevOptions) -> None:
self.run_command(["pull", "-q"], cwd=dest)
cmd_args = make_command("update", "-q", rev_options.to_args())
self.run_command(cmd_args, cwd=dest)
@classmethod
def get_remote_url(cls, location: str) -> str:
url = cls.run_command(
["showconfig", "paths.default"],
show_stdout=False,
stdout_only=True,
cwd=location,
).strip()
if cls._is_local_repository(url):
url = path_to_url(url)
return url.strip()
@classmethod
def get_revision(cls, location: str) -> str:
"""
Return the repository-local changeset revision number, as an integer.
"""
current_revision = cls.run_command(
["parents", "--template={rev}"],
show_stdout=False,
stdout_only=True,
cwd=location,
).strip()
return current_revision
@classmethod
def get_requirement_revision(cls, location: str) -> str:
"""
Return the changeset identification hash, as a 40-character
hexadecimal string
"""
current_rev_hash = cls.run_command(
["parents", "--template={node}"],
show_stdout=False,
stdout_only=True,
cwd=location,
).strip()
return current_rev_hash
@classmethod
def is_commit_id_equal(cls, dest: str, name: Optional[str]) -> bool:
"""Always assume the versions don't match"""
return False
@classmethod
def get_subdirectory(cls, location: str) -> Optional[str]:
"""
Return the path to Python project root, relative to the repo root.
Return None if the project root is in the repo root.
"""
# find the repo root
repo_root = cls.run_command(
["root"], show_stdout=False, stdout_only=True, cwd=location
).strip()
if not os.path.isabs(repo_root):
repo_root = os.path.abspath(os.path.join(location, repo_root))
return find_path_to_project_root_from_repo_root(location, repo_root)
@classmethod
def get_repository_root(cls, location: str) -> Optional[str]:
loc = super().get_repository_root(location)
if loc:
return loc
try:
r = cls.run_command(
["root"],
cwd=location,
show_stdout=False,
stdout_only=True,
on_returncode="raise",
log_failed_cmd=False,
)
except BadCommand:
logger.debug(
"could not determine if %s is under hg control "
"because hg is not available",
location,
)
return None
except InstallationError:
return None
return os.path.normpath(r.rstrip("\r\n"))
vcs.register(Mercurial)

View File

@@ -0,0 +1,324 @@
import logging
import os
import re
from typing import List, Optional, Tuple
from pip._internal.utils.misc import (
HiddenText,
display_path,
is_console_interactive,
is_installable_dir,
split_auth_from_netloc,
)
from pip._internal.utils.subprocess import CommandArgs, make_command
from pip._internal.vcs.versioncontrol import (
AuthInfo,
RemoteNotFoundError,
RevOptions,
VersionControl,
vcs,
)
logger = logging.getLogger(__name__)
_svn_xml_url_re = re.compile('url="([^"]+)"')
_svn_rev_re = re.compile(r'committed-rev="(\d+)"')
_svn_info_xml_rev_re = re.compile(r'\s*revision="(\d+)"')
_svn_info_xml_url_re = re.compile(r"<url>(.*)</url>")
class Subversion(VersionControl):
name = "svn"
dirname = ".svn"
repo_name = "checkout"
schemes = ("svn+ssh", "svn+http", "svn+https", "svn+svn", "svn+file")
@classmethod
def should_add_vcs_url_prefix(cls, remote_url: str) -> bool:
return True
@staticmethod
def get_base_rev_args(rev: str) -> List[str]:
return ["-r", rev]
@classmethod
def get_revision(cls, location: str) -> str:
"""
Return the maximum revision for all files under a given location
"""
# Note: taken from setuptools.command.egg_info
revision = 0
for base, dirs, _ in os.walk(location):
if cls.dirname not in dirs:
dirs[:] = []
continue # no sense walking uncontrolled subdirs
dirs.remove(cls.dirname)
entries_fn = os.path.join(base, cls.dirname, "entries")
if not os.path.exists(entries_fn):
# FIXME: should we warn?
continue
dirurl, localrev = cls._get_svn_url_rev(base)
if base == location:
assert dirurl is not None
base = dirurl + "/" # save the root url
elif not dirurl or not dirurl.startswith(base):
dirs[:] = []
continue # not part of the same svn tree, skip it
revision = max(revision, localrev)
return str(revision)
@classmethod
def get_netloc_and_auth(
cls, netloc: str, scheme: str
) -> Tuple[str, Tuple[Optional[str], Optional[str]]]:
"""
This override allows the auth information to be passed to svn via the
--username and --password options instead of via the URL.
"""
if scheme == "ssh":
# The --username and --password options can't be used for
# svn+ssh URLs, so keep the auth information in the URL.
return super().get_netloc_and_auth(netloc, scheme)
return split_auth_from_netloc(netloc)
@classmethod
def get_url_rev_and_auth(cls, url: str) -> Tuple[str, Optional[str], AuthInfo]:
# hotfix the URL scheme after removing svn+ from svn+ssh:// re-add it
url, rev, user_pass = super().get_url_rev_and_auth(url)
if url.startswith("ssh://"):
url = "svn+" + url
return url, rev, user_pass
@staticmethod
def make_rev_args(
username: Optional[str], password: Optional[HiddenText]
) -> CommandArgs:
extra_args: CommandArgs = []
if username:
extra_args += ["--username", username]
if password:
extra_args += ["--password", password]
return extra_args
@classmethod
def get_remote_url(cls, location: str) -> str:
# In cases where the source is in a subdirectory, we have to look up in
# the location until we find a valid project root.
orig_location = location
while not is_installable_dir(location):
last_location = location
location = os.path.dirname(location)
if location == last_location:
# We've traversed up to the root of the filesystem without
# finding a Python project.
logger.warning(
"Could not find Python project for directory %s (tried all "
"parent directories)",
orig_location,
)
raise RemoteNotFoundError
url, _rev = cls._get_svn_url_rev(location)
if url is None:
raise RemoteNotFoundError
return url
@classmethod
def _get_svn_url_rev(cls, location: str) -> Tuple[Optional[str], int]:
from pip._internal.exceptions import InstallationError
entries_path = os.path.join(location, cls.dirname, "entries")
if os.path.exists(entries_path):
with open(entries_path) as f:
data = f.read()
else: # subversion >= 1.7 does not have the 'entries' file
data = ""
url = None
if data.startswith("8") or data.startswith("9") or data.startswith("10"):
entries = list(map(str.splitlines, data.split("\n\x0c\n")))
del entries[0][0] # get rid of the '8'
url = entries[0][3]
revs = [int(d[9]) for d in entries if len(d) > 9 and d[9]] + [0]
elif data.startswith("<?xml"):
match = _svn_xml_url_re.search(data)
if not match:
raise ValueError(f"Badly formatted data: {data!r}")
url = match.group(1) # get repository URL
revs = [int(m.group(1)) for m in _svn_rev_re.finditer(data)] + [0]
else:
try:
# subversion >= 1.7
# Note that using get_remote_call_options is not necessary here
# because `svn info` is being run against a local directory.
# We don't need to worry about making sure interactive mode
# is being used to prompt for passwords, because passwords
# are only potentially needed for remote server requests.
xml = cls.run_command(
["info", "--xml", location],
show_stdout=False,
stdout_only=True,
)
match = _svn_info_xml_url_re.search(xml)
assert match is not None
url = match.group(1)
revs = [int(m.group(1)) for m in _svn_info_xml_rev_re.finditer(xml)]
except InstallationError:
url, revs = None, []
if revs:
rev = max(revs)
else:
rev = 0
return url, rev
@classmethod
def is_commit_id_equal(cls, dest: str, name: Optional[str]) -> bool:
"""Always assume the versions don't match"""
return False
def __init__(self, use_interactive: Optional[bool] = None) -> None:
if use_interactive is None:
use_interactive = is_console_interactive()
self.use_interactive = use_interactive
# This member is used to cache the fetched version of the current
# ``svn`` client.
# Special value definitions:
# None: Not evaluated yet.
# Empty tuple: Could not parse version.
self._vcs_version: Optional[Tuple[int, ...]] = None
super().__init__()
def call_vcs_version(self) -> Tuple[int, ...]:
"""Query the version of the currently installed Subversion client.
:return: A tuple containing the parts of the version information or
``()`` if the version returned from ``svn`` could not be parsed.
:raises: BadCommand: If ``svn`` is not installed.
"""
# Example versions:
# svn, version 1.10.3 (r1842928)
# compiled Feb 25 2019, 14:20:39 on x86_64-apple-darwin17.0.0
# svn, version 1.7.14 (r1542130)
# compiled Mar 28 2018, 08:49:13 on x86_64-pc-linux-gnu
# svn, version 1.12.0-SlikSvn (SlikSvn/1.12.0)
# compiled May 28 2019, 13:44:56 on x86_64-microsoft-windows6.2
version_prefix = "svn, version "
version = self.run_command(["--version"], show_stdout=False, stdout_only=True)
if not version.startswith(version_prefix):
return ()
version = version[len(version_prefix) :].split()[0]
version_list = version.partition("-")[0].split(".")
try:
parsed_version = tuple(map(int, version_list))
except ValueError:
return ()
return parsed_version
def get_vcs_version(self) -> Tuple[int, ...]:
"""Return the version of the currently installed Subversion client.
If the version of the Subversion client has already been queried,
a cached value will be used.
:return: A tuple containing the parts of the version information or
``()`` if the version returned from ``svn`` could not be parsed.
:raises: BadCommand: If ``svn`` is not installed.
"""
if self._vcs_version is not None:
# Use cached version, if available.
# If parsing the version failed previously (empty tuple),
# do not attempt to parse it again.
return self._vcs_version
vcs_version = self.call_vcs_version()
self._vcs_version = vcs_version
return vcs_version
def get_remote_call_options(self) -> CommandArgs:
"""Return options to be used on calls to Subversion that contact the server.
These options are applicable for the following ``svn`` subcommands used
in this class.
- checkout
- switch
- update
:return: A list of command line arguments to pass to ``svn``.
"""
if not self.use_interactive:
# --non-interactive switch is available since Subversion 0.14.4.
# Subversion < 1.8 runs in interactive mode by default.
return ["--non-interactive"]
svn_version = self.get_vcs_version()
# By default, Subversion >= 1.8 runs in non-interactive mode if
# stdin is not a TTY. Since that is how pip invokes SVN, in
# call_subprocess(), pip must pass --force-interactive to ensure
# the user can be prompted for a password, if required.
# SVN added the --force-interactive option in SVN 1.8. Since
# e.g. RHEL/CentOS 7, which is supported until 2024, ships with
# SVN 1.7, pip should continue to support SVN 1.7. Therefore, pip
# can't safely add the option if the SVN version is < 1.8 (or unknown).
if svn_version >= (1, 8):
return ["--force-interactive"]
return []
def fetch_new(
self, dest: str, url: HiddenText, rev_options: RevOptions, verbosity: int
) -> None:
rev_display = rev_options.to_display()
logger.info(
"Checking out %s%s to %s",
url,
rev_display,
display_path(dest),
)
if verbosity <= 0:
flag = "--quiet"
else:
flag = ""
cmd_args = make_command(
"checkout",
flag,
self.get_remote_call_options(),
rev_options.to_args(),
url,
dest,
)
self.run_command(cmd_args)
def switch(self, dest: str, url: HiddenText, rev_options: RevOptions) -> None:
cmd_args = make_command(
"switch",
self.get_remote_call_options(),
rev_options.to_args(),
url,
dest,
)
self.run_command(cmd_args)
def update(self, dest: str, url: HiddenText, rev_options: RevOptions) -> None:
cmd_args = make_command(
"update",
self.get_remote_call_options(),
rev_options.to_args(),
dest,
)
self.run_command(cmd_args)
vcs.register(Subversion)

File diff suppressed because it is too large Load Diff