238 lines
6.8 KiB
Python
238 lines
6.8 KiB
Python
""" PEP 610 """
|
|
import json
|
|
import re
|
|
import urllib.parse
|
|
from typing import Any, Dict, Iterable, Optional, Type, TypeVar, Union
|
|
|
|
__all__ = [
|
|
"DirectUrl",
|
|
"DirectUrlValidationError",
|
|
"DirInfo",
|
|
"ArchiveInfo",
|
|
"VcsInfo",
|
|
]
|
|
|
|
T = TypeVar("T")
|
|
|
|
DIRECT_URL_METADATA_NAME = "direct_url.json"
|
|
ENV_VAR_RE = re.compile(r"^\$\{[A-Za-z0-9-_]+\}(:\$\{[A-Za-z0-9-_]+\})?$")
|
|
|
|
|
|
class DirectUrlValidationError(Exception):
|
|
pass
|
|
|
|
|
|
def _get(
|
|
d: Dict[str, Any], expected_type: Type[T], key: str, default: Optional[T] = None
|
|
) -> Optional[T]:
|
|
"""Get value from dictionary and verify expected type."""
|
|
if key not in d:
|
|
return default
|
|
value = d[key]
|
|
if not isinstance(value, expected_type):
|
|
raise DirectUrlValidationError(
|
|
"{!r} has unexpected type for {} (expected {})".format(
|
|
value, key, expected_type
|
|
)
|
|
)
|
|
return value
|
|
|
|
|
|
def _get_required(
|
|
d: Dict[str, Any], expected_type: Type[T], key: str, default: Optional[T] = None
|
|
) -> T:
|
|
value = _get(d, expected_type, key, default)
|
|
if value is None:
|
|
raise DirectUrlValidationError(f"{key} must have a value")
|
|
return value
|
|
|
|
|
|
def _exactly_one_of(infos: Iterable[Optional["InfoType"]]) -> "InfoType":
|
|
infos = [info for info in infos if info is not None]
|
|
if not infos:
|
|
raise DirectUrlValidationError(
|
|
"missing one of archive_info, dir_info, vcs_info"
|
|
)
|
|
if len(infos) > 1:
|
|
raise DirectUrlValidationError(
|
|
"more than one of archive_info, dir_info, vcs_info"
|
|
)
|
|
assert infos[0] is not None
|
|
return infos[0]
|
|
|
|
|
|
def _filter_none(**kwargs: Any) -> Dict[str, Any]:
|
|
"""Make dict excluding None values."""
|
|
return {k: v for k, v in kwargs.items() if v is not None}
|
|
|
|
|
|
class VcsInfo:
|
|
name = "vcs_info"
|
|
|
|
def __init__(
|
|
self,
|
|
vcs: str,
|
|
commit_id: str,
|
|
requested_revision: Optional[str] = None,
|
|
) -> None:
|
|
self.vcs = vcs
|
|
self.requested_revision = requested_revision
|
|
self.commit_id = commit_id
|
|
|
|
@classmethod
|
|
def _from_dict(cls, d: Optional[Dict[str, Any]]) -> Optional["VcsInfo"]:
|
|
if d is None:
|
|
return None
|
|
return cls(
|
|
vcs=_get_required(d, str, "vcs"),
|
|
commit_id=_get_required(d, str, "commit_id"),
|
|
requested_revision=_get(d, str, "requested_revision"),
|
|
)
|
|
|
|
def _to_dict(self) -> Dict[str, Any]:
|
|
return _filter_none(
|
|
vcs=self.vcs,
|
|
requested_revision=self.requested_revision,
|
|
commit_id=self.commit_id,
|
|
)
|
|
|
|
|
|
class ArchiveInfo:
|
|
name = "archive_info"
|
|
|
|
def __init__(
|
|
self,
|
|
hash: Optional[str] = None,
|
|
hashes: Optional[Dict[str, str]] = None,
|
|
) -> None:
|
|
# set hashes before hash, since the hash setter will further populate hashes
|
|
self.hashes = hashes
|
|
self.hash = hash
|
|
|
|
@property
|
|
def hash(self) -> Optional[str]:
|
|
return self._hash
|
|
|
|
@hash.setter
|
|
def hash(self, value: Optional[str]) -> None:
|
|
if value is not None:
|
|
# Auto-populate the hashes key to upgrade to the new format automatically.
|
|
# We don't back-populate the legacy hash key from hashes.
|
|
try:
|
|
hash_name, hash_value = value.split("=", 1)
|
|
except ValueError:
|
|
raise DirectUrlValidationError(
|
|
f"invalid archive_info.hash format: {value!r}"
|
|
)
|
|
if self.hashes is None:
|
|
self.hashes = {hash_name: hash_value}
|
|
elif hash_name not in self.hashes:
|
|
self.hashes = self.hashes.copy()
|
|
self.hashes[hash_name] = hash_value
|
|
self._hash = value
|
|
|
|
@classmethod
|
|
def _from_dict(cls, d: Optional[Dict[str, Any]]) -> Optional["ArchiveInfo"]:
|
|
if d is None:
|
|
return None
|
|
return cls(hash=_get(d, str, "hash"), hashes=_get(d, dict, "hashes"))
|
|
|
|
def _to_dict(self) -> Dict[str, Any]:
|
|
return _filter_none(hash=self.hash, hashes=self.hashes)
|
|
|
|
|
|
class DirInfo:
|
|
name = "dir_info"
|
|
|
|
def __init__(
|
|
self,
|
|
editable: bool = False,
|
|
) -> None:
|
|
self.editable = editable
|
|
|
|
@classmethod
|
|
def _from_dict(cls, d: Optional[Dict[str, Any]]) -> Optional["DirInfo"]:
|
|
if d is None:
|
|
return None
|
|
return cls(editable=_get_required(d, bool, "editable", default=False))
|
|
|
|
def _to_dict(self) -> Dict[str, Any]:
|
|
return _filter_none(editable=self.editable or None)
|
|
|
|
|
|
InfoType = Union[ArchiveInfo, DirInfo, VcsInfo]
|
|
|
|
|
|
class DirectUrl:
|
|
def __init__(
|
|
self,
|
|
url: str,
|
|
info: InfoType,
|
|
subdirectory: Optional[str] = None,
|
|
) -> None:
|
|
self.url = url
|
|
self.info = info
|
|
self.subdirectory = subdirectory
|
|
|
|
def _remove_auth_from_netloc(self, netloc: str) -> str:
|
|
if "@" not in netloc:
|
|
return netloc
|
|
user_pass, netloc_no_user_pass = netloc.split("@", 1)
|
|
if (
|
|
isinstance(self.info, VcsInfo)
|
|
and self.info.vcs == "git"
|
|
and user_pass == "git"
|
|
):
|
|
return netloc
|
|
if ENV_VAR_RE.match(user_pass):
|
|
return netloc
|
|
return netloc_no_user_pass
|
|
|
|
@property
|
|
def redacted_url(self) -> str:
|
|
"""url with user:password part removed unless it is formed with
|
|
environment variables as specified in PEP 610, or it is ``git``
|
|
in the case of a git URL.
|
|
"""
|
|
purl = urllib.parse.urlsplit(self.url)
|
|
netloc = self._remove_auth_from_netloc(purl.netloc)
|
|
surl = urllib.parse.urlunsplit(
|
|
(purl.scheme, netloc, purl.path, purl.query, purl.fragment)
|
|
)
|
|
return surl
|
|
|
|
def validate(self) -> None:
|
|
self.from_dict(self.to_dict())
|
|
|
|
@classmethod
|
|
def from_dict(cls, d: Dict[str, Any]) -> "DirectUrl":
|
|
return DirectUrl(
|
|
url=_get_required(d, str, "url"),
|
|
subdirectory=_get(d, str, "subdirectory"),
|
|
info=_exactly_one_of(
|
|
[
|
|
ArchiveInfo._from_dict(_get(d, dict, "archive_info")),
|
|
DirInfo._from_dict(_get(d, dict, "dir_info")),
|
|
VcsInfo._from_dict(_get(d, dict, "vcs_info")),
|
|
]
|
|
),
|
|
)
|
|
|
|
def to_dict(self) -> Dict[str, Any]:
|
|
res = _filter_none(
|
|
url=self.redacted_url,
|
|
subdirectory=self.subdirectory,
|
|
)
|
|
res[self.info.name] = self.info._to_dict()
|
|
return res
|
|
|
|
@classmethod
|
|
def from_json(cls, s: str) -> "DirectUrl":
|
|
return cls.from_dict(json.loads(s))
|
|
|
|
def to_json(self) -> str:
|
|
return json.dumps(self.to_dict(), sort_keys=True)
|
|
|
|
def is_local_editable(self) -> bool:
|
|
return isinstance(self.info, DirInfo) and self.info.editable
|