if pkg_db and self.pkg_db_path.exists(): | if pkg_db and self.pkg_db_path.exists(): | ||||
self.pkg_db_path.unlink() | self.pkg_db_path.unlink() | ||||
def run(self, args: proc.CommandLine, *, cwd: Optional[Pathish] = None) -> None: | |||||
def run(self, args: proc.CommandLine, *, cwd: Optional[Pathish] = None, timeout: Optional[int] = None) -> None: | |||||
"""Execute the 'dds' executable with the given arguments""" | """Execute the 'dds' executable with the given arguments""" | ||||
env = os.environ.copy() | env = os.environ.copy() | ||||
env['DDS_NO_ADD_INITIAL_REPO'] = '1' | env['DDS_NO_ADD_INITIAL_REPO'] = '1' | ||||
proc.check_run([self.path, args], cwd=cwd or self.default_cwd, env=env) | |||||
proc.check_run([self.path, args], cwd=cwd or self.default_cwd, env=env, timeout=timeout) | |||||
def catalog_json_import(self, path: Path) -> None: | def catalog_json_import(self, path: Path) -> None: | ||||
"""Run 'catalog import' to import the given JSON. Only applicable to older 'dds'""" | """Run 'catalog import' to import the given JSON. Only applicable to older 'dds'""" | ||||
toolchain: Optional[Path] = None, | toolchain: Optional[Path] = None, | ||||
build_root: Optional[Path] = None, | build_root: Optional[Path] = None, | ||||
jobs: Optional[int] = None, | jobs: Optional[int] = None, | ||||
more_args: Optional[proc.CommandLine] = None) -> None: | |||||
more_args: Optional[proc.CommandLine] = None, | |||||
timeout: Optional[int] = None) -> None: | |||||
""" | """ | ||||
Run 'dds build' with the given arguments. | Run 'dds build' with the given arguments. | ||||
""" | """ | ||||
toolchain = toolchain or tc_mod.get_default_audit_toolchain() | toolchain = toolchain or tc_mod.get_default_audit_toolchain() | ||||
jobs = jobs or multiprocessing.cpu_count() + 2 | jobs = jobs or multiprocessing.cpu_count() + 2 | ||||
self.run([ | |||||
'build', | |||||
f'--toolchain={toolchain}', | |||||
self.repo_dir_arg, | |||||
self.catalog_path_arg, | |||||
f'--jobs={jobs}', | |||||
f'{self.project_dir_flag}={root}', | |||||
f'--out={build_root}', | |||||
more_args or (), | |||||
]) | |||||
self.run( | |||||
[ | |||||
'build', | |||||
f'--toolchain={toolchain}', | |||||
self.repo_dir_arg, | |||||
self.catalog_path_arg, | |||||
f'--jobs={jobs}', | |||||
f'{self.project_dir_flag}={root}', | |||||
f'--out={build_root}', | |||||
more_args or (), | |||||
], | |||||
timeout=timeout, | |||||
) | |||||
def compile_file(self, | def compile_file(self, | ||||
paths: Iterable[Pathish], | paths: Iterable[Pathish], |
test_tc = args.test_toolchain or toolchain.get_default_audit_toolchain() | test_tc = args.test_toolchain or toolchain.get_default_audit_toolchain() | ||||
build_dir = paths.BUILD_DIR | build_dir = paths.BUILD_DIR | ||||
with toolchain.fixup_toolchain(test_tc) as new_tc: | with toolchain.fixup_toolchain(test_tc) as new_tc: | ||||
dds.build(toolchain=new_tc, root=paths.PROJECT_ROOT, build_root=build_dir, jobs=args.jobs) | |||||
dds.build(toolchain=new_tc, root=paths.PROJECT_ROOT, build_root=build_dir, jobs=args.jobs, timeout=60 * 15) | |||||
return DDSWrapper(build_dir / ('dds' + paths.EXE_SUFFIX)) | return DDSWrapper(build_dir / ('dds' + paths.EXE_SUFFIX)) | ||||
toolchain.get_default_toolchain() if not args.rapid else toolchain.get_default_audit_toolchain()) | toolchain.get_default_toolchain() if not args.rapid else toolchain.get_default_audit_toolchain()) | ||||
with toolchain.fixup_toolchain(main_tc) as new_tc: | with toolchain.fixup_toolchain(main_tc) as new_tc: | ||||
try: | try: | ||||
dds.build(toolchain=new_tc, root=paths.PROJECT_ROOT, build_root=paths.BUILD_DIR, jobs=args.jobs) | |||||
dds.build(toolchain=new_tc, | |||||
root=paths.PROJECT_ROOT, | |||||
build_root=paths.BUILD_DIR, | |||||
jobs=args.jobs, | |||||
timeout=60 * 15) | |||||
except subprocess.CalledProcessError as e: | except subprocess.CalledProcessError as e: | ||||
if args.rapid: | if args.rapid: | ||||
return e.returncode | return e.returncode |
def run(*cmd: CommandLine, | def run(*cmd: CommandLine, | ||||
cwd: Optional[Pathish] = None, | cwd: Optional[Pathish] = None, | ||||
check: bool = False, | check: bool = False, | ||||
env: Optional[Mapping[str, str]] = None) -> ProcessResult: | |||||
env: Optional[Mapping[str, str]] = None, | |||||
timeout: Optional[int] = None) -> ProcessResult: | |||||
timeout = timeout or 60 * 5 | |||||
command = list(flatten_cmd(cmd)) | command = list(flatten_cmd(cmd)) | ||||
res = subprocess.run(command, cwd=cwd, check=False, env=env) | |||||
res = subprocess.run(command, cwd=cwd, check=False, env=env, timeout=timeout) | |||||
if res.returncode and check: | if res.returncode and check: | ||||
raise_error(res) | raise_error(res) | ||||
return res | return res | ||||
def check_run(*cmd: CommandLine, | def check_run(*cmd: CommandLine, | ||||
cwd: Optional[Pathish] = None, | cwd: Optional[Pathish] = None, | ||||
env: Optional[Mapping[str, str]] = None) -> ProcessResult: | |||||
return run(cmd, cwd=cwd, check=True, env=env) | |||||
env: Optional[Mapping[str, str]] = None, | |||||
timeout: Optional[int] = None) -> ProcessResult: | |||||
return run(cmd, cwd=cwd, check=True, env=env, timeout=timeout) |