|
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449 |
- """
- Script for populating a repository with packages declaratively.
- """
-
- import argparse
- import itertools
- import json
- import os
- import re
- import shutil
- import stat
- import sys
- import tarfile
- import tempfile
- from concurrent.futures import ThreadPoolExecutor
- from contextlib import contextmanager
- from pathlib import Path
- from subprocess import check_call
- from threading import Lock
- from typing import (Any, Dict, Iterable, Iterator, NamedTuple, NoReturn, Optional, Sequence, Tuple, Type, TypeVar,
- Union)
- from urllib import request
-
- from semver import VersionInfo
- from typing_extensions import Protocol
-
- T = TypeVar('T')
-
- I32_MAX = 0xffff_ffff - 1
- MAX_VERSION = VersionInfo(I32_MAX, I32_MAX, I32_MAX)
-
- REPO_ROOT = Path(__file__).resolve().absolute().parent.parent
-
-
- def _get_dds_exe() -> Path:
- suffix = '.exe' if os.name == 'nt' else ''
- dirs = [REPO_ROOT / '_build', REPO_ROOT / '_prebuilt']
- for d in dirs:
- exe = d / ('dds' + suffix)
- if exe.is_file():
- return exe
- raise RuntimeError('Unable to find a dds.exe to use')
-
-
- class Dependency(NamedTuple):
- name: str
- low: VersionInfo
- high: VersionInfo
-
- @classmethod
- def parse(cls: Type[T], depstr: str) -> T:
- mat = re.match(r'(.+?)([\^~\+@])(.+?)$', depstr)
- if not mat:
- raise ValueError(f'Invalid dependency string "{depstr}"')
- name, kind, version_str = mat.groups()
- version = VersionInfo.parse(version_str)
- high = {
- '^': version.bump_major,
- '~': version.bump_minor,
- '@': version.bump_patch,
- '+': lambda: MAX_VERSION,
- }[kind]()
- return cls(name, version, high)
-
-
- def glob_if_exists(path: Path, pat: str) -> Iterable[Path]:
- try:
- yield from path.glob(pat)
- except FileNotFoundError:
- yield from ()
-
-
- class MoveTransform(NamedTuple):
- frm: str
- to: str
- strip_components: int = 0
- include: Sequence[str] = []
- exclude: Sequence[str] = []
-
- @classmethod
- def parse_data(cls: Type[T], data: Any) -> T:
- return cls(frm=data.pop('from'),
- to=data.pop('to'),
- include=data.pop('include', []),
- strip_components=data.pop('strip-components', 0),
- exclude=data.pop('exclude', []))
-
- def apply_to(self, p: Path) -> None:
- src = p / self.frm
- dest = p / self.to
- if src.is_file():
- self.do_reloc_file(src, dest)
- return
-
- inc_pats = self.include or ['**/*']
- include = set(itertools.chain.from_iterable(glob_if_exists(src, pat) for pat in inc_pats))
- exclude = set(itertools.chain.from_iterable(glob_if_exists(src, pat) for pat in self.exclude))
- to_reloc = sorted(include - exclude)
- for source_file in to_reloc:
- relpath = source_file.relative_to(src)
- strip_relpath = Path('/'.join(relpath.parts[self.strip_components:]))
- dest_file = dest / strip_relpath
- self.do_reloc_file(source_file, dest_file)
-
- def do_reloc_file(self, src: Path, dest: Path) -> None:
- if src.is_dir():
- dest.mkdir(exist_ok=True, parents=True)
- else:
- dest.parent.mkdir(exist_ok=True, parents=True)
- src.rename(dest)
-
-
- class CopyTransform(MoveTransform):
- def do_reloc_file(self, src: Path, dest: Path) -> None:
- if src.is_dir():
- dest.mkdir(exist_ok=True, parents=True)
- else:
- shutil.copy2(src, dest)
-
-
- class OneEdit(NamedTuple):
- kind: str
- line: int
- content: Optional[str] = None
-
- @classmethod
- def parse_data(cls, data: Dict) -> 'OneEdit':
- return OneEdit(data.pop('kind'), data.pop('line'), data.pop('content', None))
-
- def apply_to(self, fpath: Path) -> None:
- fn = {
- 'insert': self._insert,
- # 'delete': self._delete,
- }[self.kind]
- fn(fpath)
-
- def _insert(self, fpath: Path) -> None:
- content = fpath.read_bytes()
- lines = content.split(b'\n')
- assert self.content
- lines.insert(self.line, self.content.encode())
- fpath.write_bytes(b'\n'.join(lines))
-
-
- class EditTransform(NamedTuple):
- path: str
- edits: Sequence[OneEdit] = []
-
- @classmethod
- def parse_data(cls, data: Dict) -> 'EditTransform':
- return EditTransform(data.pop('path'), [OneEdit.parse_data(ed) for ed in data.pop('edits')])
-
- def apply_to(self, p: Path) -> None:
- fpath = p / self.path
- for ed in self.edits:
- ed.apply_to(fpath)
-
-
- class WriteTransform(NamedTuple):
- path: str
- content: str
-
- @classmethod
- def parse_data(self, data: Dict) -> 'WriteTransform':
- return WriteTransform(data.pop('path'), data.pop('content'))
-
- def apply_to(self, p: Path) -> None:
- fpath = p / self.path
- print('Writing to file', p, self.content)
- fpath.write_text(self.content)
-
-
- class RemoveTransform(NamedTuple):
- path: Path
- only_matching: Sequence[str] = ()
-
- @classmethod
- def parse_data(self, d: Any) -> 'RemoveTransform':
- p = d.pop('path')
- pat = d.pop('only-matching')
- return RemoveTransform(Path(p), pat)
-
- def apply_to(self, p: Path) -> None:
- if p.is_dir():
- self._apply_dir(p)
- else:
- p.unlink()
-
- def _apply_dir(self, p: Path) -> None:
- abspath = p / self.path
- if not self.only_matching:
- # Remove everything
- if abspath.is_dir():
- better_rmtree(abspath)
- else:
- abspath.unlink()
- return
-
- for pat in self.only_matching:
- items = glob_if_exists(abspath, pat)
- for f in items:
- if f.is_dir():
- better_rmtree(f)
- else:
- f.unlink()
-
-
- class FSTransform(NamedTuple):
- copy: Optional[CopyTransform] = None
- move: Optional[MoveTransform] = None
- remove: Optional[RemoveTransform] = None
- write: Optional[WriteTransform] = None
- edit: Optional[EditTransform] = None
-
- def apply_to(self, p: Path) -> None:
- for tr in (self.copy, self.move, self.remove, self.write, self.edit):
- if tr:
- tr.apply_to(p)
-
- @classmethod
- def parse_data(self, data: Any) -> 'FSTransform':
- move = data.pop('move', None)
- copy = data.pop('copy', None)
- remove = data.pop('remove', None)
- write = data.pop('write', None)
- edit = data.pop('edit', None)
- return FSTransform(
- copy=None if copy is None else CopyTransform.parse_data(copy),
- move=None if move is None else MoveTransform.parse_data(move),
- remove=None if remove is None else RemoveTransform.parse_data(remove),
- write=None if write is None else WriteTransform.parse_data(write),
- edit=None if edit is None else EditTransform.parse_data(edit),
- )
-
-
- class HTTPRemoteSpec(NamedTuple):
- url: str
- transform: Sequence[FSTransform]
-
- @classmethod
- def parse_data(cls, data: Dict[str, Any]) -> 'HTTPRemoteSpec':
- url = data.pop('url')
- trs = [FSTransform.parse_data(tr) for tr in data.pop('transforms', [])]
- return HTTPRemoteSpec(url, trs)
-
- def make_local_dir(self):
- return http_dl_unpack(self.url)
-
-
- class GitSpec(NamedTuple):
- url: str
- ref: str
- transform: Sequence[FSTransform]
-
- @classmethod
- def parse_data(cls, data: Dict[str, Any]) -> 'GitSpec':
- ref = data.pop('ref')
- url = data.pop('url')
- trs = [FSTransform.parse_data(tr) for tr in data.pop('transform', [])]
- return GitSpec(url=url, ref=ref, transform=trs)
-
- @contextmanager
- def make_local_dir(self) -> Iterator[Path]:
- tdir = Path(tempfile.mkdtemp())
- try:
- check_call(['git', 'clone', '--quiet', self.url, f'--depth=1', f'--branch={self.ref}', str(tdir)])
- yield tdir
- finally:
- better_rmtree(tdir)
-
-
- class ForeignPackage(NamedTuple):
- remote: Union[HTTPRemoteSpec, GitSpec]
- transform: Sequence[FSTransform]
- auto_lib: Optional[Tuple]
-
- @classmethod
- def parse_data(cls, data: Dict[str, Any]) -> 'ForeignPackage':
- git = data.pop('git', None)
- http = data.pop('http', None)
- chosen = git or http
- assert chosen, data
- trs = data.pop('transform', [])
- al = data.pop('auto-lib', None)
- return ForeignPackage(
- remote=GitSpec.parse_data(git) if git else HTTPRemoteSpec.parse_data(http),
- transform=[FSTransform.parse_data(tr) for tr in trs],
- auto_lib=al.split('/') if al else None,
- )
-
- @contextmanager
- def make_local_dir(self, name: str, ver: VersionInfo) -> Iterator[Path]:
- with self.remote.make_local_dir() as tdir:
- for tr in self.transform:
- tr.apply_to(tdir)
- if self.auto_lib:
- pkg_json = {
- 'name': name,
- 'version': str(ver),
- 'namespace': self.auto_lib[0],
- }
- lib_json = {'name': self.auto_lib[1]}
- tdir.joinpath('package.jsonc').write_text(json.dumps(pkg_json))
- tdir.joinpath('library.jsonc').write_text(json.dumps(lib_json))
- yield tdir
-
-
- class SpecPackage(NamedTuple):
- name: str
- version: VersionInfo
- depends: Sequence[Dependency]
- description: str
- remote: ForeignPackage
-
- @classmethod
- def parse_data(cls, name: str, version: str, data: Any) -> 'SpecPackage':
- deps = data.pop('depends', [])
- desc = data.pop('description', '[No description]')
- remote = ForeignPackage.parse_data(data.pop('remote'))
- return SpecPackage(name,
- VersionInfo.parse(version),
- description=desc,
- depends=[Dependency.parse(d) for d in deps],
- remote=remote)
-
-
- def iter_spec(path: Path) -> Iterable[SpecPackage]:
- data = json.loads(path.read_text())
- pkgs = data['packages']
- return iter_spec_packages(pkgs)
-
-
- def iter_spec_packages(data: Dict[str, Any]) -> Iterable[SpecPackage]:
- for name, versions in data.items():
- for version, defin in versions.items():
- yield SpecPackage.parse_data(name, version, defin)
-
-
- def _on_rm_error_win32(fn, filepath, _exc_info):
- p = Path(filepath)
- p.chmod(stat.S_IWRITE)
- p.unlink()
-
-
- def better_rmtree(dir: Path) -> None:
- if os.name == 'nt':
- shutil.rmtree(dir, onerror=_on_rm_error_win32)
- else:
- shutil.rmtree(dir)
-
-
- @contextmanager
- def http_dl_unpack(url: str) -> Iterator[Path]:
- req = request.urlopen(url)
- tdir = Path(tempfile.mkdtemp())
- ofile = tdir / '.dl-archive'
- try:
- with ofile.open('wb') as fd:
- fd.write(req.read())
- tf = tarfile.open(ofile)
- tf.extractall(tdir)
- tf.close()
- ofile.unlink()
- subdir = next(iter(Path(tdir).iterdir()))
- yield subdir
- finally:
- better_rmtree(tdir)
-
-
- @contextmanager
- def spec_as_local_tgz(dds_exe: Path, spec: SpecPackage) -> Iterator[Path]:
- with spec.remote.make_local_dir(spec.name, spec.version) as clone_dir:
- out_tgz = clone_dir / 'sdist.tgz'
- check_call([str(dds_exe), 'sdist', 'create', f'--project={clone_dir}', f'--out={out_tgz}'])
- yield out_tgz
-
-
- class Repository:
- def __init__(self, dds_exe: Path, path: Path) -> None:
- self._path = path
- self._dds_exe = dds_exe
- self._import_lock = Lock()
-
- @property
- def pkg_dir(self) -> Path:
- return self._path / 'pkg'
-
- @classmethod
- def create(cls, dds_exe: Path, dirpath: Path, name: str) -> 'Repository':
- check_call([str(dds_exe), 'repoman', 'init', str(dirpath), f'--name={name}'])
- return Repository(dds_exe, dirpath)
-
- @classmethod
- def open(cls, dds_exe: Path, dirpath: Path) -> 'Repository':
- return Repository(dds_exe, dirpath)
-
- def import_tgz(self, path: Path) -> None:
- check_call([str(self._dds_exe), 'repoman', 'import', str(self._path), str(path)])
-
- def remove(self, name: str) -> None:
- check_call([str(self._dds_exe), 'repoman', 'remove', str(self._path), name])
-
- def spec_import(self, spec: Path) -> None:
- all_specs = iter_spec(spec)
- want_import = (s for s in all_specs if self._shoule_import(s))
- pool = ThreadPoolExecutor(10)
- futs = pool.map(self._get_and_import, want_import)
- for res in futs:
- pass
-
- def _shoule_import(self, spec: SpecPackage) -> bool:
- expect_file = self.pkg_dir / spec.name / str(spec.version) / 'sdist.tar.gz'
- return not expect_file.is_file()
-
- def _get_and_import(self, spec: SpecPackage) -> None:
- print(f'Import: {spec.name}@{spec.version}')
- with spec_as_local_tgz(self._dds_exe, spec) as tgz:
- with self._import_lock:
- self.import_tgz(tgz)
-
-
- class Arguments(Protocol):
- dir: Path
- spec: Path
- dds_exe: Path
-
-
- def main(argv: Sequence[str]) -> int:
- parser = argparse.ArgumentParser()
- parser.add_argument('--dds-exe', type=Path, help='Path to the dds executable to use', default=_get_dds_exe())
- parser.add_argument('--dir', '-d', help='Path to a repository to manage', required=True, type=Path)
- parser.add_argument('--spec',
- metavar='<spec-path>',
- type=Path,
- required=True,
- help='Provide a JSON document specifying how to obtain an import some packages')
- args: Arguments = parser.parse_args(argv)
- repo = Repository.open(args.dds_exe, args.dir)
- repo.spec_import(args.spec)
-
- return 0
-
-
- def start() -> NoReturn:
- sys.exit(main(sys.argv[1:]))
-
-
- if __name__ == "__main__":
- start()
|