Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • ssa/workspaces
1 result
Show changes
Commits on Source (2)
Showing
with 298 additions and 1087 deletions
......@@ -3,16 +3,23 @@
""" Module for the command line interface to data-fetcher. """
import logging
from argparse import Namespace
import os
import sys
from pathlib import Path
# pylint: disable=C0103, E0402, E0611, R0902, R0903, W0703, W1203
from typing import List, Dict
from datafetcher.errors import MissingSettingsException, NoProfileException
from datafetcher.errors import (
MissingSettingsException,
NoProfileException,
FileErrorException,
)
from datafetcher.project_fetcher import ParallelFetcher
from .locations_report import LocationsReport
from .utilities import get_arg_parser, get_capo_settings, path_is_accessible
from .utilities import parse_args, get_capo_settings, path_is_accessible
_APPLICATION_NAME = "datafetcher"
......@@ -41,40 +48,45 @@ class DataFetcher:
"""
# TODO Some Fine Day: refactor to reduce cognitive complexity
def __init__(self, args: Namespace, df_capo_settings: dict):
def __init__(self, args_in: List[str], df_capo_settings: Dict):
self.usage = self._build_usage_message()
if args is None or df_capo_settings is None:
if args_in is None or df_capo_settings is None:
raise MissingSettingsException()
self.args = args
args = parse_args(args_in)
self.settings = df_capo_settings
try:
self.verbose = self.args.verbose
self.verbose = args.verbose
except AttributeError:
# we don't care; --verbose will be dropped later in WS-179
pass
# required arguments
self.profile = args.profile
if self.profile is None:
raise NoProfileException()
if hasattr(args, "profile"):
self.profile = args.profile
else:
if "CAPO_PROFILE" in os.environ.keys():
self.profile = os.environ["CAPO_PROFILE"]
else:
raise NoProfileException("Capo profile is required")
self.output_dir = args.output_dir
if self.output_dir is None:
raise MissingSettingsException("output directory option is missing")
output_dir = Path(self.output_dir)
if not output_dir.is_dir() or not path_is_accessible(output_dir):
raise MissingSettingsException(
f"output location {self.output_dir} inaccessible or not found"
)
self.output_dir = Path(self.output_dir)
if not self.output_dir.is_dir() or not path_is_accessible(self.output_dir):
raise FileErrorException(f"output location {self.output_dir} inaccessible or not found")
if args.location_file is not None:
if args.product_locator is not None:
raise MissingSettingsException(
"required: location file OR product locator -- not both"
)
self.location_file = args.location_file
self.location_file = Path(args.location_file)
self.product_locator = None
elif args.product_locator is not None:
self.product_locator = args.product_locator
self.location_file = None
else:
raise MissingSettingsException(
"you must specify either a location file or a product locator"
......@@ -108,12 +120,16 @@ class DataFetcher:
:return:
"""
fetcher = ParallelFetcher(self.args, self.settings, self.servers_report)
fetcher = ParallelFetcher(
self.output_dir, self.is_dry, self.force, self.settings, self.servers_report
)
fetcher.run()
def _get_locations(self):
capo_settings = get_capo_settings(self.profile)
return LocationsReport(self.args, capo_settings)
if self.product_locator:
return LocationsReport(self.product_locator, capo_settings)
return LocationsReport(self.location_file, capo_settings)
def main():
......@@ -123,9 +139,16 @@ def main():
logging.basicConfig(level=logging.DEBUG)
args = get_arg_parser().parse_args()
settings = get_capo_settings(args.profile)
args = sys.argv
profile = None
if "--profile" in args:
for i in range(0, len(args)):
if args[i] == "--profile":
profile = args[i + 1]
break
if not profile:
profile = os.environ["CAPO_PROFILE"]
settings = get_capo_settings(profile)
DataFetcher(args, settings).run()
......
......@@ -33,10 +33,10 @@ class NGASFileRetriever:
and saving it to the requested location.
"""
def __init__(self, args: Namespace):
self.output_dir = args.output_dir
self.dry_run = args.dry_run
self.force_overwrite = args.force
def __init__(self, output_dir: Path, dry_run: bool, force: bool):
self.output_dir = output_dir
self.dry_run = dry_run
self.force_overwrite = force
self.fetch_attempted = False
self.num_tries = 0
......@@ -52,7 +52,7 @@ class NGASFileRetriever:
download_url = "http://" + server + "/RETRIEVE"
destination = self._get_destination(file_spec)
if destination.exists() and not self.force_overwrite and not self.dry_run:
raise FileExistsError(f"{destination} exists; aborting")
raise FileErrorException(f"{destination} exists; aborting")
self._make_basedir(destination)
......
......@@ -12,8 +12,8 @@ import copy
import http
import json
import logging
from argparse import Namespace
from typing import Dict
from pathlib import Path
from typing import Dict, Union
import requests
......@@ -25,31 +25,30 @@ from .errors import (
NoLocatorException,
MissingSettingsException,
)
from .utilities import Cluster, RetrievalMode, validate_file_spec
from .utilities import Cluster, RetrievalMode, validate_file_spec, parse_args
logger = logging.getLogger(__name__)
class LocationsReport:
""" Builds a location report """
"""
Builds a location report from specified .json locations file, or grabs
the report from archiveService using the product locator.
"""
def __init__(self, source: Union[str, Path], settings: Dict):
if isinstance(source, str):
self.product_locator = source
self.location_file = None
elif isinstance(source, Path):
self.product_locator = None
self.location_file = source
def __init__(self, args: Namespace, settings: Dict):
try:
self.verbose = args.verbose or False
except AttributeError:
# doesn't matter; verbose is going away soon
self.verbose = False
self._capture_and_validate_input(args, settings)
self._run()
def _capture_and_validate_input(self, args, settings):
if args is None:
if not self.product_locator and not self.location_file:
raise MissingSettingsException(
"arguments (locator and/or report file, destination) are required"
"either product locator or report file must be specified"
)
self.args = args
if settings is None:
raise MissingSettingsException("CAPO settings are required")
self.settings = settings
......@@ -57,10 +56,7 @@ class LocationsReport:
if not self.settings["execution_site"]:
raise MissingSettingsException("execution_site is required")
self.product_locator = args.product_locator
self.location_file = args.location_file
if not self.product_locator and not self.location_file:
raise NoLocatorException("either product locator or report file must be specified")
self._run()
def _run(self):
self.files_report = self._get_files_report()
......@@ -134,9 +130,9 @@ class LocationsReport:
:return: location report (from file, in JSON)
"""
result = dict()
if self.location_file is not None:
if self.location_file:
result = self._get_location_report_from_file()
if self.product_locator is not None:
elif self.product_locator is not None:
result = self._get_location_report_from_service()
return self._add_retrieve_method_field(result)
......
""" A locations report is produced by the archive service; you give
it a product locator and it returns a dictionary of details on how
to retrieve the product's files from long term storage (NGAS): this
class handles fetching the report from the service or reading it from
a file, and has utilities to manipulate the report.
"""
# pylint: disable=C0301, E0401, E0402, W1203
import copy
import http
import json
import logging
from json import JSONDecodeError
from typing import Dict, List
import requests
from pycapo import CapoConfig
from .errors import (
LocationServiceTimeoutException,
LocationServiceRedirectsException,
LocationServiceErrorException,
NoLocatorException,
MissingSettingsException,
)
from .utilities import Cluster, RetrievalMode, validate_file_spec, get_arg_parser
# pylint: disable=C0103, R0902, R0903, R0914, W0703, W1203
logger = logging.getLogger(__name__)
REQUIRED_SETTINGS = {
"EDU.NRAO.ARCHIVE.DATAFETCHER.DATAFETCHERSETTINGS.LOCATORSERVICEURLPREFIX": "locator_service_url",
"EDU.NRAO.ARCHIVE.DATAFETCHER.DATAFETCHERSETTINGS.EXECUTIONSITE": "execution_site",
}
class LocationsReportRefactor:
""" Builds a location report """
def __init__(self, args: List[str]):
namespace = get_arg_parser().parse_args(args)
try:
self.capo_config = CapoConfig(profile=namespace.profile)
except Exception as exc:
raise MissingSettingsException("Capo profile is required") from exc
self.product_locator = namespace.product_locator
self.location_file = namespace.location_file
self.profile = namespace.profile
if not self.product_locator and not self.location_file:
raise NoLocatorException("either product locator or report file must be specified")
self._run()
def _run(self):
self.files_report = self._get_files_report()
self.servers_report = self._get_servers_report()
def _get_files_report(self):
"""Given a product locator or a path to a location file, return a
location report: an object describing the files that make up the product
and where to get them from.
If neither argument is provided, throw a ValueError; if both are
(for some reason), then the location file takes precedence.
:return: location report (from file, in JSON)
"""
result = dict()
try:
if self.location_file is not None:
result = self._get_location_report_from_file()
if self.product_locator is not None:
result = self._get_location_report_from_service()
return self._add_retrieve_method_field(result)
except JSONDecodeError as js_err:
logger.error(f"Unable to parse {self.location_file}")
raise ReportFileParseException from js_err
def _get_location_report_from_file(self) -> Dict[str, str]:
"""
Read a file at a user-provided path to pull in the location report.
:return:
"""
logger.debug(f'fetching files from report "{self.location_file}"')
try:
with open(self.location_file) as to_read:
result = json.load(to_read)
return result
except FileNotFoundError as err:
logger.error(f"{err}")
raise
except JSONDecodeError as js_err:
logger.error(f"Unable to parse {self.location_file}")
raise ReportFileParseException from js_err
except Exception as exc:
logger.error(f"Problem getting location report from f{self.location_file}: {exc}")
raise
def _get_location_report_from_service(self):
"""Use 'requests' to fetch the location report from the locator service.
:return: location report (from locator service, in JSON)
"""
url = self.capo_config.get(
"edu.nrao.archive.datafetcher.DataFetcherSettings.locatorServiceUrlPrefix"
)
logger.debug(f"fetching report from {url} for {self.product_locator}")
# this is needed to prevent SSL errors when tests are run
# inside a Docker container
requests.packages.urllib3.util.ssl_.DEFAULT_CIPHERS += ":HIGH:!DH:!aNULL"
requests.Session().mount(url, adapter=requests.adapters.HTTPAdapter())
response = None
try:
response = requests.get(url, params={"locator": self.product_locator})
except requests.exceptions.Timeout as exc_to:
raise LocationServiceTimeoutException() from exc_to
except requests.exceptions.TooManyRedirects as exc_re:
raise LocationServiceRedirectsException() from exc_re
except requests.exceptions.RequestException as ex:
raise LocationServiceErrorException(ex) from ex
except Exception as exc:
logger.error(f"{exc}")
if response.status_code == http.HTTPStatus.OK:
return response.json()
if response.status_code == http.HTTPStatus.NOT_FOUND:
raise UnknownLocatorException(f'locator "{self.product_locator}" ' f"not found")
raise LocationServiceErrorException(
"locator service returned {}".format(response.status_code)
)
def _add_retrieve_method_field(self, files_report: Dict):
"""This adds a field to the files report about whether we can do
a direct copy or we have to rely on streaming: this is something
the location service itself doesn't know because it depends on
which site data-fetcher is running on, which site has the data and
whether
the NGAS cluster supports direct copy."""
dsoc_cluster = Cluster.DSOC
exec_site = self.capo_config.getstring(
"edu.nrao.archive.datafetcher.DataFetcherSettings.executionSite"
)
for file_spec in files_report["files"]:
validate_file_spec(file_spec, False)
server = file_spec["server"]
location = server["location"]
if server["cluster"] == dsoc_cluster.value and (
location == exec_site or location == str(exec_site)
):
server["retrieve_method"] = RetrievalMode.COPY
else:
server["retrieve_method"] = RetrievalMode.STREAM
return files_report
def _get_servers_report(self) -> Dict:
"""The location report we get back looks like this, for each file:
{"ngas_file_id":"17B-197_2018_02_19_T15_59_16.097.tar",
"subdirectory":"17B-197.sb34812522.eb35115211.58168.58572621528",
"relative_path":"17B-197_2018_02_19_T15_59_16.097.tar",
"checksum":"-1848259250",
"checksum_type":"ngamsGenCrc32",
"version":1,
"size":108677120,
"server":{"server":"nmngas01.aoc.nrao.edu:7777",
"location":"DSOC",
"cluster":"DSOC"
}}
Re-organize it to group files under servers so it is more useful.
"""
result = {}
for file_spec in self.files_report["files"]:
validate_file_spec(file_spec, True)
new_f = copy.deepcopy(file_spec)
del new_f["server"]
server = file_spec["server"]
server_host = server["server"]
if server_host not in result:
result[server_host] = dict()
result[server_host]["location"] = server["location"]
result[server_host]["cluster"] = server["cluster"]
result[server_host]["retrieve_method"] = server["retrieve_method"]
result[server_host]["files"] = list()
result[server_host]["files"].append(new_f)
return result
class ReportFileParseException(Exception):
"""Throw this when we're unable to parse a .json """
class UnknownLocatorException(Exception):
"""Throw this when we get a product locator we don't know how to handle"""
......@@ -21,21 +21,27 @@ logger = logging.getLogger(__name__)
class BaseFetcher:
""" This is a base class for fetchers. """
def __init__(self, args: Namespace, df_capo_settings: dict, servers_report: dict):
self.args = args
self.output_dir = self.args.output_dir
self.force_overwrite = args.force
self.dry_run = args.dry_run
def __init__(
self,
output_dir: Path,
dry_run: bool,
force: bool,
df_capo_settings: dict,
servers_report: dict,
):
self.output_dir = output_dir
self.force_overwrite = force
self.dry_run = dry_run
self.servers_report = servers_report
self.settings = df_capo_settings
self.ngas_retriever = NGASFileRetriever(self.args)
self.ngas_retriever = NGASFileRetriever(self.output_dir, self.dry_run, self.force_overwrite)
self.retrieved = []
self.num_files_retrieved = 0
def retrieve_files(self, server, retrieve_method, file_specs):
""" This is the part where we actually fetch the files. """
retriever = NGASFileRetriever(self.args)
retriever = NGASFileRetriever(self.output_dir, self.dry_run, self.force_overwrite)
num_files = len(file_specs)
count = 0
......@@ -58,8 +64,15 @@ class SerialFetcher(BaseFetcher):
"""
def __init__(self, args: Namespace, df_capo_settings: Dict, servers_report: Dict):
super().__init__(args, df_capo_settings, servers_report)
def __init__(
self,
output_dir: Path,
dry_run: bool,
force: bool,
df_capo_settings: dict,
servers_report: dict,
):
super().__init__(output_dir, dry_run, force, df_capo_settings, servers_report)
def run(self):
""" fetch 'em """
......@@ -78,8 +91,15 @@ class SerialFetcher(BaseFetcher):
class ParallelFetcher(BaseFetcher):
""" Pull the files out in parallel; try to be clever about it. """
def __init__(self, args: Namespace, df_capo_settings: dict, servers_report: dict):
super().__init__(args, df_capo_settings, servers_report)
def __init__(
self,
output_dir: Path,
dry_run: bool,
force: bool,
df_capo_settings: dict,
servers_report: dict,
):
super().__init__(output_dir, dry_run, force, df_capo_settings, servers_report)
self.num_files_expected = self._count_files_expected()
self.bucketized_files = self._bucketize_files()
......@@ -140,7 +160,7 @@ class ParallelFetcher(BaseFetcher):
def run(self):
""" Fetch all the files for the product locator """
if self.args.dry_run:
if self.dry_run:
logger.debug("This is a dry run; files will not be fetched")
with ThreadPoolExecutor() as executor:
......@@ -159,7 +179,7 @@ class ParallelFetcher(BaseFetcher):
# (This error sometimes gets thrown after all files
# actually -have- been retrieved. I blame the NGAS API. - JLG)
output_path = Path(self.args.output_dir)
output_path = Path(self.output_dir)
files = [
file
for file in output_path.rglob("*")
......
......@@ -13,7 +13,8 @@ import logging
import os
import pathlib
import time
from typing import Callable
from pathlib import Path
from typing import Callable, List
# pylint:disable=C0301, C0303, C0415, E0401, E0402, R0903, W0212, W1202, W0404, W0621, W1203
......@@ -26,6 +27,7 @@ from .errors import (
NGASServiceErrorException,
SizeMismatchException,
TERMINAL_ERROR_CODES,
FileErrorException,
)
LOG_FORMAT = "%(module)s.%(funcName)s, %(lineno)d: %(message)s"
......@@ -123,14 +125,6 @@ def get_arg_parser():
default=cwd,
help="output directory, default current dir",
)
# optional_group.add_argument(
# "--verbose",
# action="store_true",
# required=False,
# dest="verbose",
# default=False,
# help="make a lot of noise",
# )
optional_group.add_argument(
"--force",
action="store_true",
......@@ -327,3 +321,63 @@ class RetrievalMode(Enum):
STREAM = "stream"
COPY = "copy"
def parse_args(args: List[str]):
"""
Wrapper around parser.parse_args() so in the event of a foo
an exception is thrown rather than sys.exe
:param args:
:return:
"""
confirm_complete_args(args)
to_parse = [str(arg) for arg in args]
return get_arg_parser().parse_args(to_parse)
def confirm_complete_args(args: List[str]):
"""
Let's scrutinize the args -before- calling parse_args()
so we can differentiate among errors.
:param args:
:return:
"""
# we must have a profile
if "--profile" not in args and "CAPO_PROFILE" not in os.environ.keys():
raise NoProfileException("Capo profile is required.")
# we must have an output dir....
if "--output-dir" not in args:
raise MissingSettingsException("output dir is required")
# ... and it must exist
args_iter = iter(args)
args_dict = dict(zip(args_iter, args_iter))
output_dir_arg = args_dict["--output-dir"]
output_dir = Path(output_dir_arg)
if not output_dir.exists():
# did they just forget to specify it?
if output_dir_arg.startswith("--"):
raise MissingSettingsException("--output-dir is required")
raise FileErrorException(f"output dir '{output_dir_arg}' not found")
if not path_is_accessible(output_dir):
raise FileNotFoundError(f"permission denied on {output_dir}")
# we must have EITHER a product locator OR a locations file....
if "--product-locator" not in args_dict.keys() and "--location-file" not in args_dict.keys():
raise MissingSettingsException("either product locator or locations file required")
if "--product-locator" in args_dict.keys() and "--location-file" in args_dict.keys():
raise MissingSettingsException("product locator OR locations file required -- not both")
if "--product-locator" in args:
product_locator = args_dict["--product-locator"]
assert product_locator
else:
locations_file = args_dict["--location-file"]
if not Path(locations_file).exists():
raise FileNotFoundError(f"{locations_file} not found")
""" Utilities for passing arguments to datafetcher in tests """
import os
from pathlib import Path
from typing import List
from datafetcher.errors import MissingSettingsException, NoProfileException
from datafetcher.utilities import get_arg_parser, path_is_accessible
def parse_args(args: List[str]):
"""
Wrapper around parser.parse_args() so in the event of a foo
an exception is thrown rather than sys.exe
:param args:
:return:
"""
confirm_complete_args(args)
get_arg_parser().parse_args(args)
def confirm_complete_args(args: List[str]):
"""
Let's scrutinize the args -before- calling parse_args()
so we can differentiate among errors.
:param args:
:return:
"""
# we must have a profile
assert "--profile" in args or "CAPO_PROFILE" in os.environ
if "--profile" not in args and "CAPO_PROFILE" in os.environ:
raise NoProfileException("Capo profile is required.")
# we must have an output dir....
assert "--output-dir" in args
# ... and it must exist
args_iter = iter(args)
args_dict = dict(zip(args_iter, args_iter))
output_dir_arg = args_dict["--output-dir"]
output_dir = Path(output_dir_arg)
if not output_dir.exists():
# did they just forget to specify it?
if output_dir_arg.startswith("--"):
raise MissingSettingsException("--output-dir is required")
raise FileNotFoundError(f"output dir '{output_dir_arg}' not found")
if not path_is_accessible(output_dir):
raise FileNotFoundError(f"permission denied on {output_dir}")
# we must have EITHER a product locator OR a locations file....
if "--product-locator" not in args_dict.keys() and "--location-file" not in args_dict.keys():
raise MissingSettingsException("either product locator or locations file required")
if "--product-locator" in args_dict.keys() and "--location-file" in args_dict.keys():
raise MissingSettingsException("product locator OR locations file required -- not both")
if "--product-locator" in args:
product_locator = args_dict["--product-locator"]
assert product_locator
else:
locations_file = args_dict["--location-file"]
if not Path(locations_file).exists():
raise FileNotFoundError(f"{locations_file} not found")
......@@ -5,18 +5,17 @@
import json
import os
import sys
import tempfile
from pathlib import Path
from typing import List, Dict
# pylint: disable=C0115, C0116, C0200, C0411, E0401, E0402, R0902, R0903, R0914, R1721
# pylint: disable=W0212, W0613, W0621, W0703, W1203
import pytest
from pycapo import CapoConfig
# pylint: disable=C0115, C0116, C0200, R0902, R0903, R0914, R1721, W0212, W0613,
# pylint: disable=W0621, W0703, W1203
from .df_testdata_utils import (
get_locations_report,
get_test_data_dir,
......@@ -27,10 +26,10 @@ from datafetcher.errors import MissingSettingsException, NoProfileException
from datafetcher.locations_report import LocationsReport
from datafetcher.utilities import (
REQUIRED_SETTINGS,
get_arg_parser,
ExecutionSite,
ProductLocatorLookup,
RetrievalMode,
parse_args,
)
TEST_PROFILE = "docker"
......@@ -137,30 +136,6 @@ def get_mini_locations_file(destination):
return destination
def get_filenames_for_locator(product_locator: str, settings: dict):
"""
For a given product locators, return names of all the files
in its locations report's files report
:param product_locator:
:param settings:
:return:
"""
args = [
"--product-locator",
product_locator,
"--profile",
TEST_PROFILE,
"--output-dir",
None,
]
namespace = get_arg_parser().parse_args(args)
locations_report = LocationsReport(None, namespace, settings)
return [file["relative_path"] for file in locations_report.files_report["files"]]
def find_newest_fetch_log_file(target_dir: Path):
"""
Data-fetcher command line was executed, perhaps more than once;
......@@ -266,8 +241,7 @@ def settings(capo_settings):
:param capo_settings:
:return:
"""
""" g
"""
db_settings = get_metadata_db_settings(TEST_PROFILE)
test_data = _initialize_test_data(db_settings)
yield Settings(capo_settings, db_settings, test_data)
......@@ -305,9 +279,10 @@ def launch_datafetcher(args: list, df_capo_settings: dict) -> int:
if args is None or len(args) == 0:
raise MissingSettingsException
namespace = evaluate_args_and_capo(args, df_capo_settings)
datafetcher = DataFetcher(namespace, df_capo_settings)
# namespace = evaluate_args_and_capo(args, df_capo_settings)
datafetcher = DataFetcher(args, df_capo_settings)
datafetcher.run()
return datafetcher
def evaluate_args_and_capo(args: List[str], capo_settings: Dict[str, str]):
......@@ -320,10 +295,9 @@ def evaluate_args_and_capo(args: List[str], capo_settings: Dict[str, str]):
profile = capo_settings["profile"]
if profile is None:
raise NoProfileException
else:
args["profile"] = profile
args["--profile"] = profile
return get_arg_parser().parse_args(args)
return parse_args(args)
def get_profile_from_args(args: list) -> str:
......
""" Let's not actually hit the archive service """
import logging
import sys
from pathlib import Path
from datafetcher.locations_report_refactor import LocationsReportRefactor, UnknownLocatorException
# pylint: disable=C0301, R0903
_LOG = logging.getLogger(__name__)
PROFILE = "docker"
REPORT_FILES = {
"vla_eb": "17A-109_fg_18468.json",
"calibration": "CALIBRATION.json",
"image": "IMG.json",
"vlba": "VLBA_EB.json",
"gbt": "AGBT17B_044_02.json",
# "alma": "A001_X1296_Xa93_RAW.json",
"empty": "EMPTY.json",
"not_json": "NOT_JSON.json",
"vla_bad_server": "VLA_BAD_SERVER.json",
"vla_eb_busted": "VLA_SMALL_EB_BUSTED.json",
}
class FakeArchiveService:
""" Stand-in for archive service """
def __init__(self, product_locator: str):
"""
Return locations report for given locator
:param product_locator:
"""
self.product_locator = product_locator
self.data_dir = get_test_data_dir()
def get_locations_report(self) -> LocationsReportRefactor:
"""
Depending on the product locator, return locations report for a VLA EB, an image, a VLBA product, etc.
:return: the location report
"""
if "vlba" in self.product_locator:
return self._vlba_locations_report()
elif "calibration" in self.product_locator:
return self._calibration_locations_report()
elif "image" in self.product_locator:
return self._image_locations_report()
elif "alma" in self.product_locator:
return self._alma_locations_report()
elif "gbt" in self.product_locator:
return self._gbt_locations_report()
elif "evla/execblock" in self.product_locator:
return self._vla_eb_locations_report()
raise UnknownLocatorException
def _vla_eb_locations_report(self) -> LocationsReportRefactor:
"""
Read in a VLA EB locations report from a .json file.
:return:
"""
file = self.data_dir / REPORT_FILES["vla_eb"]
return self._get_location_report_from_file(file)
def _calibration_locations_report(self) -> LocationsReportRefactor:
"""
Read in a VLA calibrations locations report from a .json file.
:return:
"""
file = self.data_dir / REPORT_FILES["calibration"]
return self._get_location_report_from_file(file)
def _image_locations_report(self) -> LocationsReportRefactor:
"""
Read in a VLASS image locations report from a .json file.
:return:
"""
file = self.data_dir / REPORT_FILES["image"]
return self._get_location_report_from_file(file)
def _vlba_locations_report(self) -> LocationsReportRefactor:
"""
Read in a VLBA locations report from a .json file.
:return:
"""
file = self.data_dir / REPORT_FILES["vlba"]
return self._get_location_report_from_file(file)
def _gbt_locations_report(self) -> LocationsReportRefactor:
"""
Read in a GBT locations report from a .json file.
:return:
"""
file = self.data_dir / REPORT_FILES["gbt"]
return self._get_location_report_from_file(file)
def _alma_locations_report(self) -> LocationsReportRefactor:
"""
Read in an ALMA EB locations report from a .json file.
:return:
"""
file = self.data_dir / REPORT_FILES["alma"]
return self._get_location_report_from_file(file)
@staticmethod
def _get_location_report_from_file(location_file: Path) -> LocationsReportRefactor:
"""
Read a .json file into a LocationsReport.
:param location_file: Path to file
:return: the locations report
"""
args = ["--location-file", str(location_file), "--profile", PROFILE]
return LocationsReportRefactor(args)
def get_test_data_dir() -> Path:
"""
Find the .json locations report files we use for testing.
:return:
"""
top_level_subdirs = sys.path
shared_ws_src = None
for pathname in top_level_subdirs:
# test data will be a few levels under shared_wksp
if "shared/workspaces" in pathname:
shared_ws_src = pathname
break
shared_wksp = Path(shared_ws_src).parent
for item in shared_wksp.rglob("location_files"):
assert item.is_dir()
return item
return None
""" for testing the attempt to copy rather than stream files """
import logging
from argparse import Namespace
# pylint: disable=C0103, C0301, E0401, E0402, R0201, R0902, R0903, W0621
from datafetcher.locations_report import LocationsReport
from datafetcher.project_fetcher import ParallelFetcher
from datafetcher.utilities import get_capo_settings, ExecutionSite
from datafetcher.errors import MissingSettingsException
from .df_pytest_utils import TEST_PROFILE
logger = logging.getLogger(__name__)
class MockProdDataFetcher:
""" Creates and launches a datafetcher using the dsoc-prod profile """
def __init__(self, args: Namespace, settings: dict):
if args is None or settings is None:
raise MissingSettingsException()
self.args = args
self.settings = settings
self.output_dir = args.output_dir
self.profile = args.profile
self.locations_report = self._get_locations()
self.servers_report = self.locations_report.servers_report
def _get_locations(self):
"""
Create a locations report with DSOC as exec site
to force copy rather than stream
:return:
"""
capo_settings = get_capo_settings(TEST_PROFILE)
capo_settings["execution_site"] = ExecutionSite.DSOC.value
return LocationsReport(self.args, capo_settings)
def run(self):
"""
identical to DataFetcher.run()
:return:
"""
return ParallelFetcher(self.args, self.settings, self.servers_report).run()
......@@ -2,26 +2,20 @@
from pathlib import Path
# pylint: disable=C0115, C0116, C0200, C0415, R0801, R0902, R0903, R0914, R1721
# pylint: disable=W0106, W0212, W0611, W0613, W0621, W0703, W1203
# pylint: disable=E0401, E0402, W0511
import pytest
from datafetcher.errors import (
NGASFetchError,
MissingSettingsException,
NGASServiceErrorException,
FileErrorException,
)
# pylint: disable=C0115, C0116, C0200, C0415, R0801, R0902, R0903, R0914, R1721,
# pylint: disable=W0212, W0611, W0613, W0621, W0703, W1203
from datafetcher.datafetcher import DataFetcher
from datafetcher.utilities import (
get_arg_parser,
ProductLocatorLookup,
RetrievalMode,
Location,
Cluster,
)
from datafetcher.utilities import ProductLocatorLookup, parse_args
from .test_df_return_status import try_to_launch_df
......@@ -49,16 +43,6 @@ _EB_EXTERNAL_NAME = "sysstartS.58955.83384832176"
print(f">>> RUNNING ALL TESTS: {RUN_ALL}")
def test_settings_setup(settings):
""" Ensure that the test settings we're using make sense """
assert capo_settings is not None
assert isinstance(settings.capo_settings, dict)
assert settings.db_settings is not None
assert isinstance(settings.db_settings, dict)
assert settings.test_data is not None
assert isinstance(settings.test_data, dict)
@pytest.mark.skipif(not RUN_ALL, reason="debug")
def test_nothing_retrieved_if_dry_locator(make_tempdir, settings):
""" Simulates dry run with product locator """
......@@ -156,28 +140,22 @@ def test_no_overwrite_without_force(make_tempdir, capo_settings):
args = [
"--location-file",
str(location_file),
location_file,
"--profile",
TEST_PROFILE,
"--output-dir",
str(top_level),
]
with pytest.raises(FileExistsError):
with pytest.raises(FileErrorException):
try_to_launch_df(capo_settings, args)
retrieved = [
file for file in top_level.rglob("*") if file.is_file() and not str(file).endswith(".json")
]
num_not_too_big_files_expected = 28
assert len(retrieved) == num_not_too_big_files_expected
# get rid of all the files we downloaded, plus the log
deleted = [
file.unlink() for file in retrieved if file.is_file() and not str(file).endswith(".json")
]
assert len(deleted) >= num_not_too_big_files_expected
[file.unlink() for file in retrieved if file.is_file() and not str(file).endswith(".json")]
@pytest.mark.skipif(not RUN_ALL, reason="debug")
......@@ -216,7 +194,7 @@ def test_copy_attempt_throws_fetch_error(make_tempdir, settings):
"--profile",
prod_profile,
]
namespace = get_arg_parser().parse_args(args)
namespace = parse_args(args)
datafetcher = DataFetcher(namespace, settings.capo_settings)
servers_report = datafetcher.servers_report
......@@ -256,170 +234,6 @@ def test_dies_with_bad_server_info(make_tempdir, settings):
launch_datafetcher(args, settings.capo_settings)
@pytest.mark.skipif(not RUN_ALL, reason="debug")
def test_missing_setting_exc_on_bad_destination(settings):
args = [
"--profile",
TEST_PROFILE,
"--product-locator",
settings.test_data["product_locator"],
"--output-dir",
"floob",
]
with pytest.raises(MissingSettingsException):
launch_datafetcher(args, settings.capo_settings)
@pytest.mark.skip("takes too long; re-enable with WS-179-1")
def test_gets_vlbas_from_report_file(make_tempdir, capo_settings):
location_file = get_locations_file("VLBA_EB")
args = [
"--profile",
TEST_PROFILE,
"--output-dir",
str(make_tempdir),
"--location-file",
str(location_file),
]
datafetcher = DataFetcher(get_arg_parser().parse_args(args), capo_settings)
servers_report = datafetcher.servers_report
assert len(servers_report) == 1
datafetcher.run()
dest_dir = Path(make_tempdir)
file_info_dict = dict()
for server in servers_report.items():
assert server[0] == "nmngas03.aoc.nrao.edu:7777"
values = server[1]
assert values["location"] == Location.DSOC.value
assert values["cluster"] == Cluster.DSOC.value
assert values["retrieve_method"] == RetrievalMode.STREAM
file_values = values["files"]
assert len(file_values) == 16
for filename in file_values:
write_fake_file(dest_dir, filename)
file_info_dict[filename["ngas_file_id"]] = filename
datafetcher = DataFetcher(get_arg_parser().parse_args(args), settings.capo_settings)
datafetcher.run()
for filename in file_info_dict:
path = Path(dest_dir, filename)
assert path.is_file()
contents = path.read_text().strip()
assert int(contents) == file_info_dict[filename]["size"]
@pytest.mark.skip("takes too long; re-enable with WS-179-1")
def test_gets_large_vla_ebs_from_report_file(make_tempdir, capo_settings):
location_file = get_locations_file("VLA_SMALL_EB")
args = [
"--profile",
TEST_PROFILE,
"--output-dir",
str(make_tempdir),
"--location-file",
str(location_file),
]
datafetcher = DataFetcher(get_arg_parser().parse_args(args), capo_settings)
servers_report = datafetcher.servers_report
assert len(servers_report) == 2
datafetcher.run()
server_file_count = {
"nmngas03.aoc.nrao.edu:7777": 0,
"nmngas04.aoc.nrao.edu:7777": 0,
}
dest_dir = Path(make_tempdir)
file_list = list()
for server in servers_report.items():
server_url = server[0]
assert server_url in server_file_count.keys()
values = server[1]
assert values["location"] == Location.DSOC.value
assert values["cluster"] == Cluster.DSOC.value
assert values["retrieve_method"] == RetrievalMode.STREAM
file_values = values["files"]
server_file_count[server_url] += len(file_values)
for filename in file_values:
write_fake_file(dest_dir, filename)
file_list.append(values)
assert server_file_count["nmngas03.aoc.nrao.edu:7777"] == 3
assert server_file_count["nmngas04.aoc.nrao.edu:7777"] == 41
datafetcher = DataFetcher(get_arg_parser().parse_args(args), capo_settings)
datafetcher.run()
found_count = 0
for file_info in file_list:
for file in file_info["files"]:
filename = file["ngas_file_id"]
path = Path(dest_dir, filename)
assert path.is_file()
contents = path.read_text().strip()
assert int(contents) == file["size"]
found_count += 1
assert found_count == len(file_list)
@pytest.mark.skip("takes too long; re-enable with WS-179-1")
def test_gets_images_from_report_file(make_tempdir, capo_settings):
location_file = get_locations_file("IMG")
args = [
"--profile",
TEST_PROFILE,
"--output-dir",
str(make_tempdir),
"--location-file",
str(location_file),
]
datafetcher = DataFetcher(get_arg_parser().parse_args(args), capo_settings)
servers_report = datafetcher.servers_report
assert len(servers_report) == 2
server_file_count = {
"nmngas01.aoc.nrao.edu:7777": 0,
"nmngas02.aoc.nrao.edu:7777": 0,
}
dest_dir = Path(make_tempdir)
file_list = list()
for server in servers_report.items():
server_url = server[0]
assert server_url in server_file_count.keys()
values = server[1]
assert values["location"] == Location.DSOC.value
assert values["cluster"] == Cluster.DSOC.value
assert values["retrieve_method"] == RetrievalMode.STREAM
file_values = values["files"]
server_file_count[server_url] += len(file_values)
for filename in file_values:
write_fake_file(dest_dir, filename)
file_list.append(values)
for server_url, count in server_file_count.items():
assert count == 1
datafetcher.run()
found_count = 0
for file_info in file_list:
for file in file_info["files"]:
filename = file["ngas_file_id"]
path = Path(dest_dir, filename)
assert path.is_file()
contents = path.read_text().strip()
assert int(contents) == file["size"]
found_count += 1
assert found_count == len(file_list)
@pytest.mark.skipif(not RUN_ALL, reason="debug")
def test_gets_calibration_from_report_file(make_tempdir):
location_file = get_locations_file("CALIBRATION")
......@@ -427,11 +241,11 @@ def test_gets_calibration_from_report_file(make_tempdir):
"--profile",
TEST_PROFILE,
"--output-dir",
str(make_tempdir),
make_tempdir,
"--location-file",
str(location_file),
location_file,
]
datafetcher = DataFetcher(get_arg_parser().parse_args(args), capo_settings)
datafetcher = DataFetcher(args, capo_settings)
servers_report = datafetcher.servers_report
assert len(servers_report) == 1
......@@ -460,12 +274,11 @@ def test_gets_calibration_from_locator(make_tempdir, settings):
"--product-locator",
product_locator,
"--output-dir",
str(make_tempdir),
make_tempdir,
"--profile",
TEST_PROFILE,
]
namespace = get_arg_parser().parse_args(args)
fetch = DataFetcher(namespace, settings.capo_settings)
fetch = DataFetcher(args, settings.capo_settings)
report_files = fetch.locations_report.files_report["files"]
assert len(report_files) == 1
......@@ -483,40 +296,6 @@ def test_gets_calibration_from_locator(make_tempdir, settings):
assert int(contents) == file_spec["size"]
@pytest.mark.skipif(not RUN_ALL, reason="debug")
def test_gets_gbt_data_from_locator(make_tempdir, settings):
""" Can we cope with GBT data? """
external_name = "AGBT17B_044_553492"
product_locator = ProductLocatorLookup(settings.db_settings).look_up_locator_for_ext_name(
external_name
)
args = [
"--product-locator",
product_locator,
"--output-dir",
str(make_tempdir),
"--profile",
TEST_PROFILE,
]
namespace = get_arg_parser().parse_args(args)
fetch = DataFetcher(namespace, settings.capo_settings)
report_files = fetch.locations_report.files_report["files"]
assert len(report_files) == 1
file_spec = report_files[0]
relative_path = file_spec["relative_path"]
assert relative_path == "AGBT17B_044_01.tar"
destination = Path(make_tempdir) / relative_path
destination.mkdir()
write_fake_file(destination, file_spec)
fake_file = Path(destination, file_spec["ngas_file_id"])
assert fake_file.is_file()
contents = fake_file.read_text().strip()
assert int(contents) == file_spec["size"]
def write_fake_file(destination: Path, file_info: dict):
filename = file_info["ngas_file_id"]
path = Path(destination, filename)
......
......@@ -20,17 +20,16 @@ from datafetcher.errors import (
NGASFetchError,
NoLocatorException,
SizeMismatchException,
FileErrorException,
)
from datafetcher.datafetcher import DataFetcher
from datafetcher.utilities import (
get_arg_parser,
ProductLocatorLookup,
path_is_accessible,
)
# N.B. IJ doesn't recognize imported fixtures as being in use.
# don't let these imports (make_tempdir, settings) get disappeared.
from .df_pytest_arg_utils import parse_args
from .df_pytest_utils import (
TEST_PROFILE,
......@@ -40,12 +39,8 @@ from .df_pytest_utils import (
settings,
launch_datafetcher,
RUN_ALL,
confirm_retrieve_mode_copy,
evaluate_args_and_capo,
)
from .mock_data_fetcher import MockProdDataFetcher
def test_launch_df(make_tempdir, settings):
"""
......@@ -59,7 +54,7 @@ def test_launch_df(make_tempdir, settings):
"--product-locator",
settings.test_data["product_locator"],
"--output-dir",
str(make_tempdir),
make_tempdir,
"--profile",
TEST_PROFILE,
"--dry-run",
......@@ -109,7 +104,7 @@ def test_omitted_profile_throws_no_profile_exc(make_tempdir, settings):
"--product-locator",
settings.test_data["product_locator"],
"--output-dir",
str(make_tempdir),
make_tempdir,
]
try:
......@@ -154,7 +149,7 @@ def test_invalid_capo_profile_raises_no_profile_exc(make_tempdir, settings):
"--product-locator",
locator,
"--output-dir",
str(make_tempdir),
make_tempdir,
]
with pytest.raises(MissingSettingsException):
try_to_launch_df(settings.capo_settings, args)
......@@ -191,7 +186,7 @@ def test_inaccessible_output_dir_throws_file_not_found(settings):
str(tmpdir),
]
with pytest.raises(FileNotFoundError):
with pytest.raises(FileErrorException):
try_to_launch_df(settings.capo_settings, args)
......@@ -212,9 +207,9 @@ def test_locator_plus_file_throws_missing_setting(make_tempdir, capo_settings):
"--product-locator",
"a_locator",
"--location-file",
"location.json",
Path("location.json"),
"--output-dir",
str(make_tempdir),
make_tempdir,
"--profile",
TEST_PROFILE,
]
......@@ -239,7 +234,7 @@ def test_locator_service_timeout_returns_expected_code(monkeypatch, settings, ma
"--product-locator",
settings.test_data["product_locator"],
"--output-dir",
str(make_tempdir),
make_tempdir,
"--profile",
TEST_PROFILE,
]
......@@ -263,7 +258,7 @@ def test_too_many_service_redirects_returns_expected_code(monkeypatch, settings,
"--product-locator",
settings.test_data["product_locator"],
"--output-dir",
str(make_tempdir),
make_tempdir,
"--profile",
TEST_PROFILE,
]
......@@ -287,7 +282,7 @@ def test_catastrophic_service_error_returns_expected_code(monkeypatch, settings,
"--product-locator",
settings.test_data["product_locator"],
"--output-dir",
str(make_tempdir),
make_tempdir,
"--profile",
TEST_PROFILE,
]
......@@ -297,46 +292,13 @@ def test_catastrophic_service_error_returns_expected_code(monkeypatch, settings,
launch_datafetcher(args, settings.capo_settings)
@pytest.mark.skipif(not RUN_ALL, reason="debug")
def test_copy_attempt_raises_ngas_fetch_error(make_tempdir, settings):
args = [
"--product-locator",
settings.test_data["product_locator"],
"--output-dir",
str(make_tempdir),
"--profile",
TEST_PROFILE,
]
namespace = get_arg_parser().parse_args(args)
fetcher = MockProdDataFetcher(namespace, settings.capo_settings)
servers_report = fetcher.servers_report
confirm_retrieve_mode_copy(servers_report)
# let's try just one file so we're not sitting here all day
a_server = None
for server in servers_report:
entry = servers_report[server]
servers_report = {server: entry}
fetcher.servers_report = servers_report
assert fetcher.servers_report[server] is not None
a_server = server
break
files = fetcher.servers_report[a_server]["files"]
fetcher.servers_report[a_server]["files"] = [files[0]]
with pytest.raises(NGASFetchError):
fetcher.run()
@pytest.mark.skipif(not RUN_ALL, reason="debug")
def test_product_locator_not_found_returns_expected_code(make_tempdir, settings):
args = [
"--product-locator",
"not_a_locator",
"--output-dir",
str(make_tempdir),
make_tempdir,
"--profile",
TEST_PROFILE,
]
......@@ -348,9 +310,9 @@ def test_product_locator_not_found_returns_expected_code(make_tempdir, settings)
def test_unable_to_open_location_file_throws_file_not_found(make_tempdir, capo_settings):
args = [
"--location-file",
"location.json",
Path("location.json"),
"--output-dir",
str(make_tempdir),
make_tempdir,
"--profile",
TEST_PROFILE,
]
......@@ -375,7 +337,7 @@ def test_error_fetching_file_from_ngas_returns_expected_code(monkeypatch, settin
"--product-locator",
settings.test_data["product_locator"],
"--output-dir",
str(make_tempdir),
make_tempdir,
"--profile",
TEST_PROFILE,
]
......@@ -401,7 +363,7 @@ def test_unexpected_size_returns_expected_code(monkeypatch, settings, make_tempd
"--product-locator",
settings.test_data["product_locator"],
"--output-dir",
str(make_tempdir),
make_tempdir,
"--profile",
TEST_PROFILE,
]
......@@ -411,7 +373,5 @@ def test_unexpected_size_returns_expected_code(monkeypatch, settings, make_tempd
def try_to_launch_df(capo_settings, args: List[str]):
parse_args(args)
namespace = evaluate_args_and_capo(args, capo_settings)
fetcher = DataFetcher(namespace, capo_settings)
fetcher = DataFetcher(args, capo_settings)
return fetcher.run()
""" Tests to confirm our fake archive service functions as intended """
# pylint: disable=E0402, W0511
from datafetcher.locations_report_refactor import LocationsReportRefactor
from .fake_archive_service import FakeArchiveService
# TODO: move this to appropriate DF test module
# def test_gets_df_settings():
# capo = CapoConfig(profile=PROFILE)
# key = "edu.nrao.archive.datafetcher.DataFetcherSettings"
# field = "locatorServiceUrlPrefix"
# to_get = key + "." + field
# try:
# url = capo.get(to_get)
# except KeyError as k_err:
# raise MissingSettingsException(f'missing required setting "{field}"') from k_err
# assert "locator" in url
#
#
def test_service_returns_vla_eb_locations():
"""
Does FakeArchiveService return the calibrations locations report in
test_data?
:return:
"""
found = False
report = FakeArchiveService("uid://evla/execblock/123a-456-b-789z").get_locations_report()
assert len(report.servers_report) == 2
assert len(report.files_report["files"]) == 79
expected_ngas_id = "17A-109_2017_02_11_T18_49_09.756.tar"
rel_path = None
for file_spec in report.files_report["files"]:
if file_spec["ngas_file_id"] == expected_ngas_id:
found = True
rel_path = file_spec["relative_path"]
break
assert found
assert rel_path == expected_ngas_id
def test_service_returns_vlba_locations():
"""
Does FakeArchiveService return the calibrations locations report in
test_data?
:return:
"""
report = FakeArchiveService("uid://evla/vlba/uuid-du-jour31").get_locations_report()
assert len(report.servers_report) == 1
assert len(report.files_report["files"]) == 16
expected_agg_size = 2140560000
actual_agg_size = report.files_report["aggregate_size"]
assert actual_agg_size == expected_agg_size
def test_service_returns_cal_locations():
"""
Does FakeArchiveService return the calibrations locations report in
test_data?
:return:
"""
found = False
report = FakeArchiveService("uid://evla/calibration/different-uuid").get_locations_report()
assert len(report.servers_report) == 1
assert len(report.files_report) == 2
expected_subdir = "18B-265_2019_12_10_T00_00_59.203.tar"
for file_spec in report.files_report["files"]:
if file_spec["subdirectory"] == expected_subdir:
found = True
break
assert found
def test_service_returns_image_locations():
"""
Does FakeArchiveService return the image locations report in test_data?
:return:
"""
report = FakeArchiveService("uid://evla/image/some-uuid").get_locations_report()
assert isinstance(report, LocationsReportRefactor)
assert len(report.servers_report) == 2
assert len(report.files_report) == 2
expected_file_id = "uid____evla_image_56a10be7-f1c2-4788-8651-6ecc5bfbc2f1.fits"
found = False
for file_spec in report.files_report["files"]:
if file_spec["ngas_file_id"] == expected_file_id:
found = True
break
assert found
""" Unit tests for LocationsReportRefactor """
# pylint: disable=W0511, E0401, E0402
import pytest
from datafetcher.locations_report_refactor import (
LocationsReportRefactor,
ReportFileParseException,
UnknownLocatorException,
)
from .fake_archive_service import REPORT_FILES, get_test_data_dir
PROFILE = "docker"
def test_service_returns_expected_report():
"""
Does FakeArchiveService return a valid locations report?
:return:
"""
product_locator = "uid://evla/image/7a546de2-ab1f-4915-a8dc-94f572f9215c"
args = ["--product-locator", product_locator, "--profile", PROFILE]
report = LocationsReportRefactor(args)
assert report.files_report is not None
assert len(report.servers_report) == 2
assert len(report.files_report) == 2
def test_empty_file_fails_as_expected():
"""
When we attempt to create a locations report from a blank
.json file, do we get the expected error?
:return:
"""
file = get_test_data_dir() / REPORT_FILES["empty"]
args = ["--location-file", str(file), "--profile", PROFILE]
with pytest.raises(ReportFileParseException):
LocationsReportRefactor(args)
def test_bad_json_fails_as_expected():
"""
When we attempt to create a locations report from a badly-formed
.json file, do we get the expected error?
:return:
"""
file = get_test_data_dir() / REPORT_FILES["not_json"]
args = ["--location-file", str(file), "--profile", PROFILE]
with pytest.raises(ReportFileParseException):
LocationsReportRefactor(args)
def test_bad_locator_fails_as_expected():
"""
An invalid locator should throw UnknownLocatorException
:return:
"""
product_locator = "uid://this/is/b00000gus"
args = ["--product-locator", product_locator, "--profile", PROFILE]
with pytest.raises(UnknownLocatorException):
LocationsReportRefactor(args)
def test_missing_file_fails_as_expected():
"""
If the specified location report file isn't there,
locations report should throw a FNF
:return:
"""
file = get_test_data_dir() / "nonexistent.json"
args = ["--location-file", str(file), "--profile", PROFILE]
with pytest.raises(FileNotFoundError):
LocationsReportRefactor(args)
def test_file_returns_expected_report():
"""
Can we get a valid locations report from a report file?
:return:
"""
file = get_test_data_dir() / REPORT_FILES["vla_eb"]
args = ["--location-file", str(file), "--profile", PROFILE]
report = LocationsReportRefactor(args)
assert len(report.servers_report) == 2
assert len(report.files_report) == 2
# TODO: other location report tests from test_df_function, test_df_return_codes
"""add-delivery-results-to-capability-execution
Revision ID: 68d0883785b7
Revises: cb49c557f7e8
Create Date: 2021-04-12 10:30:16.482620
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = "68d0883785b7"
down_revision = "cb49c557f7e8"
branch_labels = None
depends_on = None
def upgrade():
op.add_column(
"capability_executions",
sa.Column("delivery_url", sa.String, comment="URL to the results, if possible"),
)
op.add_column(
"capability_executions",
sa.Column("delivery_path", sa.String, comment="Path to the results on disk"),
)
def downgrade():
op.drop_column("capability_executions", "delivery_url")
op.drop_column("capability_executions", "delivery_path")
......@@ -20,6 +20,10 @@ The messenger object is also responsible for message-to-event
logger = logging.getLogger()
class MessageFormatException(Exception):
pass
class AMQPServerInitializer:
"""
Object responsible for creating a connection to the correct AMQP server and establishing exchanges and queues for
......@@ -153,7 +157,12 @@ class Messenger(ConsumerProducerMixin):
:param message: Body of AMQP message
"""
if "routing_key" not in message.keys():
logger.warning("Message has no routing key.")
if "service" in message.keys():
message["routing_key"] = message["service"]
else:
raise MessageFormatException(
"Message has no routing key. Keys are: " + ", ".join(message.keys())
)
self.producer.publish(
message,
......
from __future__ import annotations
import datetime
import pathlib
from typing import Tuple
import pendulum
......@@ -204,6 +205,19 @@ class CapabilityVersion(Base, CapabilityVersionIF):
return self.request.capability
class SaPath(sa.types.TypeDecorator):
impl = sa.types.String
def process_bind_param(self, value, dialect):
return str(value)
def process_result_value(self, value, dialect):
return pathlib.Path(value)
def copy(self, **kw):
return SaPath(self.impl.length)
class CapabilityExecution(Base, CapabilityExecutionIF, JSONSerializable):
"""
Schema representation of a capability request's execution record
......@@ -223,6 +237,8 @@ class CapabilityExecution(Base, CapabilityExecutionIF, JSONSerializable):
steps = sa.Column("steps", sa.String)
version = relationship(CapabilityVersion, back_populates="executions")
current_workflow_request_id = sa.Column("current_workflow_request_id", sa.Integer)
delivery_url = sa.Column("delivery_url", sa.String)
delivery_path = sa.Column("delivery_path", SaPath)
created_at = sa.Column(
"created_at",
......
from __future__ import annotations
from typing import Dict, List
import pathlib
from typing import List
from workspaces.capability.helpers_interfaces import CapabilityStepIF, ParameterIF
from workspaces.products.schema_interfaces import FutureProductIF
from workspaces.system.schema import JSONSerializable
from workspaces.workflow.schema_interfaces import WorkflowRequestIF
class CapabilityIF(JSONSerializable):
......@@ -45,6 +45,8 @@ class CapabilityExecutionIF:
steps: str
capability: CapabilityIF
capability_request: CapabilityRequestIF
delivery_url: str
delivery_path: pathlib.Path
def on_last_step(self) -> bool:
raise NotImplementedError
......
......@@ -27,10 +27,12 @@ class CapabilityService(CapabilityServiceIF):
The capability service: clients access this to request capability runs
"""
def __init__(self,
capability_info: CapabilityInfoIF,
workflow_service: WorkflowServiceIF,
notification_service: NotificationServiceIF):
def __init__(
self,
capability_info: CapabilityInfoIF,
workflow_service: WorkflowServiceIF,
notification_service: NotificationServiceIF,
):
self.message_router = Router("capability")
self.message_router.register(self)
self.execution_manager = ExecutionManager(
......@@ -91,6 +93,29 @@ class CapabilityService(CapabilityServiceIF):
}
self.message_router.send_message(**capability_complete_msg)
@on_message(service="workflow", type="delivery")
def on_delivery(self, **message: Dict):
"""
Catch a delivery notification and update the locations in the
referenced execution based on it
:param message: a delivery-type notification with a workflow request
subject and a delivery field
"""
logger.info("Received delivery notification: %s", message)
# retrieve the delivery structure in the message
delivery = message["delivery"]
# we caught a delivery event, find the associated capability execution
execution = self.capability_info.lookup_execution_by_workflow_request_id(
message["subject"]["workflow_request_id"]
)
# update some fields on the execution, if possible
execution.delivery_url = delivery["url"] if "url" in delivery else None
execution.delivery_path = delivery["delivered_to"] if "delivered_to" in delivery else None
@on_message(type="capability-submitted")
def notify_submitted(self, **message: Dict):
subject = message["subject"]
......