| import sys | |||||
| from pathlib import Path | |||||
| sys.path.append(str(Path(__file__).absolute().parent.parent / 'tools')) | |||||
| print(sys.path) | |||||
| from .dds import DDS, DDSFixtureParams, scoped_dds, dds_fixture_conf, dds_fixture_conf_1 | from .dds import DDS, DDSFixtureParams, scoped_dds, dds_fixture_conf, dds_fixture_conf_1 | 
| import pytest | import pytest | ||||
| from . import fileutil | |||||
| CommandLineArg = Union[str, Path, int, float] | |||||
| CommandLineArg1 = Union[CommandLineArg, Iterable[CommandLineArg]] | |||||
| CommandLineArg2 = Union[CommandLineArg1, Iterable[CommandLineArg1]] | |||||
| CommandLineArg3 = Union[CommandLineArg2, Iterable[CommandLineArg2]] | |||||
| CommandLineArg4 = Union[CommandLineArg3, Iterable[CommandLineArg3]] | |||||
| CommandLine = Iterable[CommandLineArg4] | |||||
| from dds_ci import proc | |||||
| def _flatten_cmd(cmd: CommandLine) -> Iterable[str]: | |||||
| if isinstance(cmd, (str, Path)): | |||||
| yield str(cmd) | |||||
| elif isinstance(cmd, (int, float)): | |||||
| yield str(cmd) | |||||
| elif hasattr(cmd, '__iter__'): | |||||
| each = (_flatten_cmd(arg) for arg in cmd) # type: ignore | |||||
| for item in each: | |||||
| yield from item | |||||
| else: | |||||
| assert False, f'Invalid command line element: {repr(cmd)}' | |||||
| from . import fileutil | |||||
| class DDS: | class DDS: | ||||
| if self.scratch_dir.exists(): | if self.scratch_dir.exists(): | ||||
| shutil.rmtree(self.scratch_dir) | shutil.rmtree(self.scratch_dir) | ||||
| def run_unchecked(self, cmd: CommandLine, *, | |||||
| def run_unchecked(self, cmd: proc.CommandLine, *, | |||||
| cwd: Path = None) -> subprocess.CompletedProcess: | cwd: Path = None) -> subprocess.CompletedProcess: | ||||
| full_cmd = list(_flatten_cmd(itertools.chain([self.dds_exe], cmd))) | |||||
| return subprocess.run( | |||||
| full_cmd, | |||||
| cwd=cwd or self.source_root, | |||||
| # stdout=subprocess.PIPE, | |||||
| # stderr=subprocess.STDOUT, | |||||
| ) | |||||
| def run(self, cmd: CommandLine, *, | |||||
| full_cmd = itertools.chain([self.dds_exe], cmd) | |||||
| return proc.run(full_cmd, cwd=cwd or self.source_root) | |||||
| def run(self, cmd: proc.CommandLine, *, | |||||
| cwd: Path = None) -> subprocess.CompletedProcess: | cwd: Path = None) -> subprocess.CompletedProcess: | ||||
| cmdline = list(_flatten_cmd(cmd)) | |||||
| cmdline = list(proc.flatten_cmd(cmd)) | |||||
| res = self.run_unchecked(cmd) | res = self.run_unchecked(cmd) | ||||
| if res.returncode != 0: | if res.returncode != 0: | ||||
| raise subprocess.CalledProcessError( | raise subprocess.CalledProcessError( | ||||
| 'sdist', | 'sdist', | ||||
| 'create', | 'create', | ||||
| self.project_dir_arg, | self.project_dir_arg, | ||||
| f'--out={self.build_dir / "stuff.sds"}', | |||||
| f'--out={self.build_dir / "created-sdist.sds"}', | |||||
| ]) | ]) | ||||
| def sdist_export(self) -> subprocess.CompletedProcess: | def sdist_export(self) -> subprocess.CompletedProcess: | 
| from typing import Sequence, NamedTuple | from typing import Sequence, NamedTuple | ||||
| import subprocess | import subprocess | ||||
| import urllib.request | import urllib.request | ||||
| import shutil | |||||
| HERE = Path(__file__).parent.absolute() | |||||
| TOOLS_DIR = HERE | |||||
| PROJECT_ROOT = HERE.parent | |||||
| PREBUILT_DDS = PROJECT_ROOT / '_prebuilt/dds' | |||||
| from self_build import self_build | |||||
| from self_deps_get import self_deps_get | |||||
| from dds_ci import paths, proc | |||||
| class CIOptions(NamedTuple): | class CIOptions(NamedTuple): | ||||
| subprocess.check_call([ | subprocess.check_call([ | ||||
| sys.executable, | sys.executable, | ||||
| '-u', | '-u', | ||||
| str(TOOLS_DIR / 'bootstrap.py'), | |||||
| str(paths.TOOLS_DIR / 'bootstrap.py'), | |||||
| f'--cxx={opts.cxx}', | f'--cxx={opts.cxx}', | ||||
| ]) | ]) | ||||
| print(f'Downloading prebuilt DDS executable: {url}') | print(f'Downloading prebuilt DDS executable: {url}') | ||||
| stream = urllib.request.urlopen(url) | stream = urllib.request.urlopen(url) | ||||
| PREBUILT_DDS.parent.mkdir(exist_ok=True, parents=True) | |||||
| with PREBUILT_DDS.open('wb') as fd: | |||||
| paths.PREBUILT_DDS.parent.mkdir(exist_ok=True, parents=True) | |||||
| with paths.PREBUILT_DDS.open('wb') as fd: | |||||
| while True: | while True: | ||||
| buf = stream.read(1024 * 4) | buf = stream.read(1024 * 4) | ||||
| if not buf: | if not buf: | ||||
| if os.name != 'nt': | if os.name != 'nt': | ||||
| # Mark the binary executable. By default it won't be | # Mark the binary executable. By default it won't be | ||||
| mode = PREBUILT_DDS.stat().st_mode | |||||
| mode = paths.PREBUILT_DDS.stat().st_mode | |||||
| mode |= 0b001_001_001 | mode |= 0b001_001_001 | ||||
| PREBUILT_DDS.chmod(mode) | |||||
| paths.PREBUILT_DDS.chmod(mode) | |||||
| def main(argv: Sequence[str]) -> int: | def main(argv: Sequence[str]) -> int: | ||||
| else: | else: | ||||
| assert False, 'impossible' | assert False, 'impossible' | ||||
| subprocess.check_call([ | |||||
| str(PREBUILT_DDS), | |||||
| proc.check_run( | |||||
| paths.PREBUILT_DDS, | |||||
| 'deps', | 'deps', | ||||
| 'build', | 'build', | ||||
| f'-T{opts.toolchain}', | |||||
| f'--repo-dir={PROJECT_ROOT / "external/repo"}', | |||||
| ]) | |||||
| ('-T', opts.toolchain), | |||||
| ('--repo-dir', paths.EMBEDDED_REPO_DIR), | |||||
| ) | |||||
| subprocess.check_call([ | |||||
| str(PREBUILT_DDS), | |||||
| proc.check_run( | |||||
| paths.PREBUILT_DDS, | |||||
| 'build', | 'build', | ||||
| '--full', | '--full', | ||||
| f'-T{opts.toolchain}', | |||||
| ]) | |||||
| ('-T', opts.toolchain), | |||||
| ) | |||||
| exe_suffix = '.exe' if os.name == 'nt' else '' | |||||
| subprocess.check_call([ | |||||
| sys.executable, | |||||
| '-u', | |||||
| str(TOOLS_DIR / 'self-test.py'), | |||||
| f'--exe={PROJECT_ROOT / f"_build/dds{exe_suffix}"}', | |||||
| f'-T{opts.toolchain}', | |||||
| ]) | |||||
| self_build(paths.CUR_BUILT_DDS, opts.toolchain) | |||||
| if paths.SELF_TEST_REPO_DIR.exists(): | |||||
| shutil.rmtree(paths.SELF_TEST_REPO_DIR) | |||||
| self_deps_get(paths.CUR_BUILT_DDS, paths.SELF_TEST_REPO_DIR) | |||||
| return pytest.main(['-v', '--durations=10']) | return pytest.main(['-v', '--durations=10']) | ||||
| from argparse import ArgumentParser | |||||
| def add_tc_arg(parser: ArgumentParser, *, required=True) -> None: | |||||
| parser.add_argument( | |||||
| '--toolchain', | |||||
| '-T', | |||||
| help='The DDS toolchain to use', | |||||
| required=required) | |||||
| def add_dds_exe_arg(parser: ArgumentParser, *, required=True) -> None: | |||||
| parser.add_argument( | |||||
| '--exe', | |||||
| '-e', | |||||
| help='Path to a DDS executable to use', | |||||
| require=required) | 
| import os | |||||
| from pathlib import Path | |||||
| TOOLS_DIR = Path(__file__).absolute().parent.parent | |||||
| PROJECT_ROOT = TOOLS_DIR.parent | |||||
| BUILD_DIR = PROJECT_ROOT / '_build' | |||||
| PREBUILT_DIR = PROJECT_ROOT / '_prebuilt' | |||||
| EXE_SUFFIX = '.exe' if os.name == 'nt' else '' | |||||
| PREBUILT_DDS = (PREBUILT_DIR / 'dds').with_suffix(EXE_SUFFIX) | |||||
| CUR_BUILT_DDS = (BUILD_DIR / 'dds').with_suffix(EXE_SUFFIX) | |||||
| EMBEDDED_REPO_DIR = PROJECT_ROOT / 'external/repo' | |||||
| SELF_TEST_REPO_DIR = BUILD_DIR / '_self-repo' | 
| from pathlib import PurePath, Path | |||||
| from typing import Iterable, Union | |||||
| import subprocess | |||||
| CommandLineArg = Union[str, PurePath, int, float] | |||||
| CommandLineArg1 = Union[CommandLineArg, Iterable[CommandLineArg]] | |||||
| CommandLineArg2 = Union[CommandLineArg1, Iterable[CommandLineArg1]] | |||||
| CommandLineArg3 = Union[CommandLineArg2, Iterable[CommandLineArg2]] | |||||
| CommandLineArg4 = Union[CommandLineArg3, Iterable[CommandLineArg3]] | |||||
| CommandLine = Union[CommandLineArg4, Iterable[CommandLineArg4]] | |||||
| def flatten_cmd(cmd: CommandLine) -> Iterable[str]: | |||||
| if isinstance(cmd, (str, PurePath)): | |||||
| yield str(cmd) | |||||
| elif isinstance(cmd, (int, float)): | |||||
| yield str(cmd) | |||||
| elif hasattr(cmd, '__iter__'): | |||||
| each = (flatten_cmd(arg) for arg in cmd) # type: ignore | |||||
| for item in each: | |||||
| yield from item | |||||
| else: | |||||
| assert False, f'Invalid command line element: {repr(cmd)}' | |||||
| def run(*cmd: CommandLine, cwd: Path = None) -> subprocess.CompletedProcess: | |||||
| return subprocess.run( | |||||
| list(flatten_cmd(cmd)), # type: ignore | |||||
| cwd=cwd, | |||||
| ) | |||||
| def check_run(*cmd: CommandLine, | |||||
| cwd: Path = None) -> subprocess.CompletedProcess: | |||||
| flat_cmd = list(flatten_cmd(cmd)) # type: ignore | |||||
| res = run(flat_cmd, cwd=cwd) | |||||
| if res.returncode != 0: | |||||
| raise subprocess.CalledProcessError(res.returncode, flat_cmd) | |||||
| return res | 
| import subprocess | import subprocess | ||||
| import sys | import sys | ||||
| from dds_ci import cli | |||||
| ROOT = Path(__file__).parent.parent.absolute() | ROOT = Path(__file__).parent.parent.absolute() | ||||
| def bootstrap_self(exe: Path, toolchain: str): | |||||
| def self_build(exe: Path, toolchain: str): | |||||
| # Copy the exe to another location, as windows refuses to let a binary be | # Copy the exe to another location, as windows refuses to let a binary be | ||||
| # replaced while it is executing | # replaced while it is executing | ||||
| new_exe = ROOT / '_dds.bootstrap-test.exe' | new_exe = ROOT / '_dds.bootstrap-test.exe' | ||||
| def main(argv: List[str]) -> int: | def main(argv: List[str]) -> int: | ||||
| parser = argparse.ArgumentParser() | parser = argparse.ArgumentParser() | ||||
| parser.add_argument( | |||||
| '--exe', | |||||
| '-e', | |||||
| help='Path to the dds executable to test', | |||||
| required=True) | |||||
| parser.add_argument( | |||||
| '--toolchain', | |||||
| '-T', | |||||
| help='The dds toolchain to use while testing', | |||||
| required=True, | |||||
| ) | |||||
| cli.add_tc_arg(parser) | |||||
| cli.add_dds_exe_arg(parser) | |||||
| args = parser.parse_args(argv) | args = parser.parse_args(argv) | ||||
| bootstrap_self(Path(args.exe), args.toolchain) | |||||
| self_build(Path(args.exe), args.toolchain) | |||||
| return 0 | return 0 | ||||
| from pathlib import Path | |||||
| from dds_ci import proc | |||||
| PROJECT_ROOT = Path(__file__).absolute().parent.parent | |||||
| def self_deps_get(dds_exe: Path, repo_dir: Path) -> None: | |||||
| proc.check_run( | |||||
| dds_exe, | |||||
| 'deps', | |||||
| 'get', | |||||
| ('--repo-dir', repo_dir), | |||||
| ('--remote-list', PROJECT_ROOT / 'remote.dds'), | |||||
| ) | |||||
| if __name__ == "__main__": | |||||
| self_deps_get(PROJECT_ROOT / '_build/dds', | |||||
| PROJECT_ROOT / '_build/_self-repo') |