Nelze vybrat více než 25 témat Téma musí začínat písmenem nebo číslem, může obsahovat pomlčky („-“) a může být dlouhé až 35 znaků.

import_test.py 2.1KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576
  1. import json
  2. from pathlib import Path
  3. from functools import partial
  4. from concurrent.futures import ThreadPoolExecutor
  5. from http.server import SimpleHTTPRequestHandler, HTTPServer
  6. import time
  7. import pytest
  8. from tests import dds, DDS
  9. from tests.fileutil import ensure_dir
  10. class DirectoryServingHTTPRequestHandler(SimpleHTTPRequestHandler):
  11. def __init__(self, *args, **kwargs) -> None:
  12. self.dir = kwargs.pop('dir')
  13. super().__init__(*args, **kwargs)
  14. def translate_path(self, path) -> str:
  15. abspath = Path(super().translate_path(path))
  16. relpath = abspath.relative_to(Path.cwd())
  17. return self.dir / relpath
  18. def test_import_json(dds: DDS):
  19. dds.scope.enter_context(ensure_dir(dds.build_dir))
  20. dds.catalog_create()
  21. json_fpath = dds.build_dir / 'data.json'
  22. import_data = {
  23. 'version': 2,
  24. 'packages': {
  25. 'foo': {
  26. '1.2.4': {
  27. 'url': 'git+http://example.com#master',
  28. 'depends': [],
  29. },
  30. '1.2.5': {
  31. 'url': 'git+http://example.com#master',
  32. },
  33. },
  34. },
  35. }
  36. dds.scope.enter_context(
  37. dds.set_contents(json_fpath,
  38. json.dumps(import_data).encode()))
  39. dds.catalog_import(json_fpath)
  40. @pytest.yield_fixture
  41. def http_import_server():
  42. handler = partial(
  43. DirectoryServingHTTPRequestHandler,
  44. dir=Path.cwd() / 'data/http-test-1')
  45. addr = ('0.0.0.0', 8000)
  46. pool = ThreadPoolExecutor()
  47. with HTTPServer(addr, handler) as httpd:
  48. pool.submit(lambda: httpd.serve_forever(poll_interval=0.1))
  49. try:
  50. yield
  51. finally:
  52. httpd.shutdown()
  53. def test_import_http(dds: DDS, http_import_server):
  54. dds.repo_dir.mkdir(parents=True, exist_ok=True)
  55. dds.run(
  56. [
  57. 'repo',
  58. dds.repo_dir_arg,
  59. 'import',
  60. 'https://github.com/vector-of-bool/neo-buffer/archive/0.4.2.tar.gz?dds_strpcmp=1',
  61. ],
  62. cwd=dds.repo_dir,
  63. )
  64. assert dds.repo_dir.joinpath('neo-buffer@0.4.2').is_dir()