diff --git a/apps/cli/executables/datafetcher/datafetcher/locations_report_refactor.py b/apps/cli/executables/datafetcher/datafetcher/locations_report_refactor.py new file mode 100644 index 0000000000000000000000000000000000000000..2379321104e2d04dbcfd0b9630f3cdbcc55b0746 --- /dev/null +++ b/apps/cli/executables/datafetcher/datafetcher/locations_report_refactor.py @@ -0,0 +1,207 @@ +""" A locations report is produced by the archive service; you give + it a product locator and it returns a dictionary of details on how + to retrieve the product's files from long term storage (NGAS): this + class handles fetching the report from the service or reading it from + a file, and has utilities to manipulate the report. + +""" + +# pylint: disable=C0301, E0401, E0402, W1203 + +import copy +import http +import json +import logging +from json import JSONDecodeError +from typing import Dict, List + +import requests +from pycapo import CapoConfig + + +from .errors import ( + LocationServiceTimeoutException, + LocationServiceRedirectsException, + LocationServiceErrorException, + NoLocatorException, + MissingSettingsException, +) +from .utilities import Cluster, RetrievalMode, validate_file_spec, get_arg_parser + +# pylint: disable=C0103, R0902, R0903, R0914, W0703, W1203 + +logger = logging.getLogger(__name__) + +REQUIRED_SETTINGS = { + "EDU.NRAO.ARCHIVE.DATAFETCHER.DATAFETCHERSETTINGS.LOCATORSERVICEURLPREFIX": "locator_service_url", + "EDU.NRAO.ARCHIVE.DATAFETCHER.DATAFETCHERSETTINGS.EXECUTIONSITE": "execution_site", +} + + +class LocationsReportRefactor: + """ Builds a location report """ + + def __init__(self, args: List[str]): + namespace = get_arg_parser().parse_args(args) + try: + self.capo_config = CapoConfig(profile=namespace.profile) + except Exception as exc: + raise MissingSettingsException("Capo profile is required") from exc + + self.product_locator = namespace.product_locator + self.location_file = namespace.location_file + self.profile = namespace.profile + if not self.product_locator and not self.location_file: + raise NoLocatorException("either product locator or report file must be specified") + + self._run() + + def _run(self): + self.files_report = self._get_files_report() + self.servers_report = self._get_servers_report() + + def _get_files_report(self): + """Given a product locator or a path to a location file, return a + location report: an object describing the files that make up the product + and where to get them from. + If neither argument is provided, throw a ValueError; if both are + (for some reason), then the location file takes precedence. + + :return: location report (from file, in JSON) + """ + result = dict() + try: + if self.location_file is not None: + result = self._get_location_report_from_file() + if self.product_locator is not None: + result = self._get_location_report_from_service() + + return self._add_retrieve_method_field(result) + except JSONDecodeError as js_err: + logger.error(f"Unable to parse {self.location_file}") + raise ReportFileParseException from js_err + + def _get_location_report_from_file(self) -> Dict[str, str]: + """ + Read a file at a user-provided path to pull in the location report. + + :return: + """ + logger.debug(f'fetching files from report "{self.location_file}"') + try: + with open(self.location_file) as to_read: + result = json.load(to_read) + return result + except FileNotFoundError as err: + logger.error(f"{err}") + raise + except JSONDecodeError as js_err: + logger.error(f"Unable to parse {self.location_file}") + raise ReportFileParseException from js_err + except Exception as exc: + logger.error(f"Problem getting location report from f{self.location_file}: {exc}") + raise + + def _get_location_report_from_service(self): + """Use 'requests' to fetch the location report from the locator service. + + :return: location report (from locator service, in JSON) + """ + url = self.capo_config.get( + "edu.nrao.archive.datafetcher.DataFetcherSettings.locatorServiceUrlPrefix" + ) + + logger.debug(f"fetching report from {url} for {self.product_locator}") + + # this is needed to prevent SSL errors when tests are run + # inside a Docker container + requests.packages.urllib3.util.ssl_.DEFAULT_CIPHERS += ":HIGH:!DH:!aNULL" + requests.Session().mount(url, adapter=requests.adapters.HTTPAdapter()) + + response = None + try: + response = requests.get(url, params={"locator": self.product_locator}) + except requests.exceptions.Timeout as exc_to: + raise LocationServiceTimeoutException() from exc_to + except requests.exceptions.TooManyRedirects as exc_re: + raise LocationServiceRedirectsException() from exc_re + except requests.exceptions.RequestException as ex: + raise LocationServiceErrorException(ex) from ex + except Exception as exc: + logger.error(f"{exc}") + + if response.status_code == http.HTTPStatus.OK: + return response.json() + if response.status_code == http.HTTPStatus.NOT_FOUND: + raise UnknownLocatorException(f'locator "{self.product_locator}" ' f"not found") + raise LocationServiceErrorException( + "locator service returned {}".format(response.status_code) + ) + + def _add_retrieve_method_field(self, files_report: Dict): + """This adds a field to the files report about whether we can do + a direct copy or we have to rely on streaming: this is something + the location service itself doesn't know because it depends on + which site data-fetcher is running on, which site has the data and + whether + the NGAS cluster supports direct copy.""" + dsoc_cluster = Cluster.DSOC + exec_site = self.capo_config.getstring( + "edu.nrao.archive.datafetcher.DataFetcherSettings.executionSite" + ) + + for file_spec in files_report["files"]: + validate_file_spec(file_spec, False) + + server = file_spec["server"] + location = server["location"] + if server["cluster"] == dsoc_cluster.value and ( + location == exec_site or location == str(exec_site) + ): + server["retrieve_method"] = RetrievalMode.COPY + else: + server["retrieve_method"] = RetrievalMode.STREAM + + return files_report + + def _get_servers_report(self) -> Dict: + """The location report we get back looks like this, for each file: + + {"ngas_file_id":"17B-197_2018_02_19_T15_59_16.097.tar", + "subdirectory":"17B-197.sb34812522.eb35115211.58168.58572621528", + "relative_path":"17B-197_2018_02_19_T15_59_16.097.tar", + "checksum":"-1848259250", + "checksum_type":"ngamsGenCrc32", + "version":1, + "size":108677120, + "server":{"server":"nmngas01.aoc.nrao.edu:7777", + "location":"DSOC", + "cluster":"DSOC" + }} + + Re-organize it to group files under servers so it is more useful. + """ + + result = {} + for file_spec in self.files_report["files"]: + validate_file_spec(file_spec, True) + new_f = copy.deepcopy(file_spec) + del new_f["server"] + server = file_spec["server"] + server_host = server["server"] + if server_host not in result: + result[server_host] = dict() + result[server_host]["location"] = server["location"] + result[server_host]["cluster"] = server["cluster"] + result[server_host]["retrieve_method"] = server["retrieve_method"] + result[server_host]["files"] = list() + result[server_host]["files"].append(new_f) + return result + + +class ReportFileParseException(Exception): + """Throw this when we're unable to parse a .json """ + + +class UnknownLocatorException(Exception): + """Throw this when we get a product locator we don't know how to handle""" diff --git a/apps/cli/executables/datafetcher/test/df_pytest_arg_utils.py b/apps/cli/executables/datafetcher/test/df_pytest_arg_utils.py new file mode 100644 index 0000000000000000000000000000000000000000..23d86909c80e5a48c986784df5404b43e8364351 --- /dev/null +++ b/apps/cli/executables/datafetcher/test/df_pytest_arg_utils.py @@ -0,0 +1,65 @@ +""" Utilities for passing arguments to datafetcher in tests """ +import os +from pathlib import Path +from typing import List + +from datafetcher.errors import MissingSettingsException, NoProfileException +from datafetcher.utilities import get_arg_parser, path_is_accessible + + +def parse_args(args: List[str]): + """ + Wrapper around parser.parse_args() so in the event of a foo + an exception is thrown rather than sys.exe + + :param args: + :return: + """ + confirm_complete_args(args) + get_arg_parser().parse_args(args) + + +def confirm_complete_args(args: List[str]): + """ + Let's scrutinize the args -before- calling parse_args() + so we can differentiate among errors. + + :param args: + :return: + """ + + # we must have a profile + assert "--profile" in args or "CAPO_PROFILE" in os.environ + if "--profile" not in args and "CAPO_PROFILE" in os.environ: + raise NoProfileException("Capo profile is required.") + + # we must have an output dir.... + assert "--output-dir" in args + + # ... and it must exist + args_iter = iter(args) + args_dict = dict(zip(args_iter, args_iter)) + + output_dir_arg = args_dict["--output-dir"] + output_dir = Path(output_dir_arg) + if not output_dir.exists(): + # did they just forget to specify it? + if output_dir_arg.startswith("--"): + raise MissingSettingsException("--output-dir is required") + raise FileNotFoundError(f"output dir '{output_dir_arg}' not found") + if not path_is_accessible(output_dir): + raise FileNotFoundError(f"permission denied on {output_dir}") + + # we must have EITHER a product locator OR a locations file.... + if "--product-locator" not in args_dict.keys() and "--location-file" not in args_dict.keys(): + raise MissingSettingsException("either product locator or locations file required") + if "--product-locator" in args_dict.keys() and "--location-file" in args_dict.keys(): + raise MissingSettingsException("product locator OR locations file required -- not both") + + if "--product-locator" in args: + product_locator = args_dict["--product-locator"] + assert product_locator + else: + locations_file = args_dict["--location-file"] + if not Path(locations_file).exists(): + raise FileNotFoundError(f"{locations_file} not found") diff --git a/apps/cli/executables/datafetcher/test/fake_archive_service.py b/apps/cli/executables/datafetcher/test/fake_archive_service.py new file mode 100644 index 0000000000000000000000000000000000000000..e80de324e09b6ce3227a1b11a596b1feec81f839 --- /dev/null +++ b/apps/cli/executables/datafetcher/test/fake_archive_service.py @@ -0,0 +1,148 @@ +""" Let's not actually hit the archive service """ + +import logging +import sys +from pathlib import Path + +from datafetcher.locations_report_refactor import LocationsReportRefactor, UnknownLocatorException + +# pylint: disable=C0301, R0903 + +_LOG = logging.getLogger(__name__) + +PROFILE = "docker" + +REPORT_FILES = { + "vla_eb": "17A-109_fg_18468.json", + "calibration": "CALIBRATION.json", + "image": "IMG.json", + "vlba": "VLBA_EB.json", + "gbt": "AGBT17B_044_02.json", + # "alma": "A001_X1296_Xa93_RAW.json", + "empty": "EMPTY.json", + "not_json": "NOT_JSON.json", + "vla_bad_server": "VLA_BAD_SERVER.json", + "vla_eb_busted": "VLA_SMALL_EB_BUSTED.json", +} + + +class FakeArchiveService: + """ Stand-in for archive service """ + + def __init__(self, product_locator: str): + """ + Return locations report for given locator + + :param product_locator: + """ + self.product_locator = product_locator + self.data_dir = get_test_data_dir() + + def get_locations_report(self) -> LocationsReportRefactor: + """ + Depending on the product locator, return locations report for a VLA EB, an image, a VLBA product, etc. + + :return: the location report + """ + + if "vlba" in self.product_locator: + return self._vlba_locations_report() + elif "calibration" in self.product_locator: + return self._calibration_locations_report() + elif "image" in self.product_locator: + return self._image_locations_report() + elif "alma" in self.product_locator: + return self._alma_locations_report() + elif "gbt" in self.product_locator: + return self._gbt_locations_report() + elif "evla/execblock" in self.product_locator: + return self._vla_eb_locations_report() + raise UnknownLocatorException + + def _vla_eb_locations_report(self) -> LocationsReportRefactor: + """ + Read in a VLA EB locations report from a .json file. + + :return: + """ + file = self.data_dir / REPORT_FILES["vla_eb"] + return self._get_location_report_from_file(file) + + def _calibration_locations_report(self) -> LocationsReportRefactor: + """ + Read in a VLA calibrations locations report from a .json file. + + :return: + """ + file = self.data_dir / REPORT_FILES["calibration"] + return self._get_location_report_from_file(file) + + def _image_locations_report(self) -> LocationsReportRefactor: + """ + Read in a VLASS image locations report from a .json file. + + :return: + """ + file = self.data_dir / REPORT_FILES["image"] + return self._get_location_report_from_file(file) + + def _vlba_locations_report(self) -> LocationsReportRefactor: + """ + Read in a VLBA locations report from a .json file. + + :return: + """ + file = self.data_dir / REPORT_FILES["vlba"] + return self._get_location_report_from_file(file) + + def _gbt_locations_report(self) -> LocationsReportRefactor: + """ + Read in a GBT locations report from a .json file. + + :return: + """ + file = self.data_dir / REPORT_FILES["gbt"] + return self._get_location_report_from_file(file) + + def _alma_locations_report(self) -> LocationsReportRefactor: + """ + Read in an ALMA EB locations report from a .json file. + + :return: + """ + file = self.data_dir / REPORT_FILES["alma"] + return self._get_location_report_from_file(file) + + @staticmethod + def _get_location_report_from_file(location_file: Path) -> LocationsReportRefactor: + """ + Read a .json file into a LocationsReport. + + :param location_file: Path to file + :return: the locations report + """ + args = ["--location-file", str(location_file), "--profile", PROFILE] + return LocationsReportRefactor(args) + + +def get_test_data_dir() -> Path: + """ + Find the .json locations report files we use for testing. + + :return: + """ + + top_level_subdirs = sys.path + shared_ws_src = None + for pathname in top_level_subdirs: + # test data will be a few levels under shared_wksp + if "shared/workspaces" in pathname: + shared_ws_src = pathname + break + shared_wksp = Path(shared_ws_src).parent + + for item in shared_wksp.rglob("location_files"): + assert item.is_dir() + return item + + return None diff --git a/apps/cli/executables/datafetcher/test/mock_data_fetcher.py b/apps/cli/executables/datafetcher/test/mock_data_fetcher.py new file mode 100644 index 0000000000000000000000000000000000000000..488f874f7ef033ee18e8827b6fd642c9fc7223e1 --- /dev/null +++ b/apps/cli/executables/datafetcher/test/mock_data_fetcher.py @@ -0,0 +1,50 @@ +""" for testing the attempt to copy rather than stream files """ +import logging +from argparse import Namespace + +# pylint: disable=C0103, C0301, E0401, E0402, R0201, R0902, R0903, W0621 + +from datafetcher.locations_report import LocationsReport +from datafetcher.project_fetcher import ParallelFetcher +from datafetcher.utilities import get_capo_settings, ExecutionSite +from datafetcher.errors import MissingSettingsException + +from .df_pytest_utils import TEST_PROFILE + +logger = logging.getLogger(__name__) + + +class MockProdDataFetcher: + """ Creates and launches a datafetcher using the dsoc-prod profile """ + + def __init__(self, args: Namespace, settings: dict): + if args is None or settings is None: + raise MissingSettingsException() + self.args = args + self.settings = settings + + self.output_dir = args.output_dir + self.profile = args.profile + + self.locations_report = self._get_locations() + self.servers_report = self.locations_report.servers_report + + def _get_locations(self): + """ + Create a locations report with DSOC as exec site + to force copy rather than stream + :return: + """ + + capo_settings = get_capo_settings(TEST_PROFILE) + capo_settings["execution_site"] = ExecutionSite.DSOC.value + + return LocationsReport(self.args, capo_settings) + + def run(self): + """ + identical to DataFetcher.run() + :return: + """ + + return ParallelFetcher(self.args, self.settings, self.servers_report).run() diff --git a/apps/cli/executables/datafetcher/test/test_fake_archive_service.py b/apps/cli/executables/datafetcher/test/test_fake_archive_service.py new file mode 100644 index 0000000000000000000000000000000000000000..4820ceef0124d825e6b54b17bcca55c47d5db3d4 --- /dev/null +++ b/apps/cli/executables/datafetcher/test/test_fake_archive_service.py @@ -0,0 +1,111 @@ +""" Tests to confirm our fake archive service functions as intended """ + +# pylint: disable=E0402, W0511 + +from datafetcher.locations_report_refactor import LocationsReportRefactor + +from .fake_archive_service import FakeArchiveService + +# TODO: move this to appropriate DF test module +# def test_gets_df_settings(): +# capo = CapoConfig(profile=PROFILE) +# key = "edu.nrao.archive.datafetcher.DataFetcherSettings" +# field = "locatorServiceUrlPrefix" +# to_get = key + "." + field +# try: +# url = capo.get(to_get) +# except KeyError as k_err: +# raise MissingSettingsException(f'missing required setting "{field}"') from k_err +# assert "locator" in url +# +# + + +def test_service_returns_vla_eb_locations(): + + """ + Does FakeArchiveService return the calibrations locations report in + test_data? + + :return: + """ + + found = False + report = FakeArchiveService("uid://evla/execblock/123a-456-b-789z").get_locations_report() + + assert len(report.servers_report) == 2 + assert len(report.files_report["files"]) == 79 + + expected_ngas_id = "17A-109_2017_02_11_T18_49_09.756.tar" + rel_path = None + for file_spec in report.files_report["files"]: + if file_spec["ngas_file_id"] == expected_ngas_id: + found = True + rel_path = file_spec["relative_path"] + break + assert found + assert rel_path == expected_ngas_id + + +def test_service_returns_vlba_locations(): + + """ + Does FakeArchiveService return the calibrations locations report in + test_data? + + :return: + """ + + report = FakeArchiveService("uid://evla/vlba/uuid-du-jour31").get_locations_report() + + assert len(report.servers_report) == 1 + assert len(report.files_report["files"]) == 16 + + expected_agg_size = 2140560000 + actual_agg_size = report.files_report["aggregate_size"] + assert actual_agg_size == expected_agg_size + + +def test_service_returns_cal_locations(): + + """ + Does FakeArchiveService return the calibrations locations report in + test_data? + + :return: + """ + + found = False + report = FakeArchiveService("uid://evla/calibration/different-uuid").get_locations_report() + + assert len(report.servers_report) == 1 + assert len(report.files_report) == 2 + + expected_subdir = "18B-265_2019_12_10_T00_00_59.203.tar" + for file_spec in report.files_report["files"]: + if file_spec["subdirectory"] == expected_subdir: + found = True + break + assert found + + +def test_service_returns_image_locations(): + + """ + Does FakeArchiveService return the image locations report in test_data? + + :return: + """ + + report = FakeArchiveService("uid://evla/image/some-uuid").get_locations_report() + assert isinstance(report, LocationsReportRefactor) + assert len(report.servers_report) == 2 + assert len(report.files_report) == 2 + + expected_file_id = "uid____evla_image_56a10be7-f1c2-4788-8651-6ecc5bfbc2f1.fits" + found = False + for file_spec in report.files_report["files"]: + if file_spec["ngas_file_id"] == expected_file_id: + found = True + break + assert found diff --git a/apps/cli/executables/datafetcher/test/test_locations_report.py b/apps/cli/executables/datafetcher/test/test_locations_report.py new file mode 100644 index 0000000000000000000000000000000000000000..41e010e1fd297184f1b0e9a67276597ab66ad4d5 --- /dev/null +++ b/apps/cli/executables/datafetcher/test/test_locations_report.py @@ -0,0 +1,98 @@ +""" Unit tests for LocationsReportRefactor """ + +# pylint: disable=W0511, E0401, E0402 + +import pytest + +from datafetcher.locations_report_refactor import ( + LocationsReportRefactor, + ReportFileParseException, + UnknownLocatorException, +) + +from .fake_archive_service import REPORT_FILES, get_test_data_dir + +PROFILE = "docker" + + +def test_service_returns_expected_report(): + """ + Does FakeArchiveService return a valid locations report? + + :return: + """ + product_locator = "uid://evla/image/7a546de2-ab1f-4915-a8dc-94f572f9215c" + args = ["--product-locator", product_locator, "--profile", PROFILE] + report = LocationsReportRefactor(args) + + assert report.files_report is not None + assert len(report.servers_report) == 2 + assert len(report.files_report) == 2 + + +def test_empty_file_fails_as_expected(): + """ + When we attempt to create a locations report from a blank + .json file, do we get the expected error? + + :return: + """ + file = get_test_data_dir() / REPORT_FILES["empty"] + args = ["--location-file", str(file), "--profile", PROFILE] + with pytest.raises(ReportFileParseException): + LocationsReportRefactor(args) + + +def test_bad_json_fails_as_expected(): + """ + When we attempt to create a locations report from a badly-formed + .json file, do we get the expected error? + + :return: + """ + file = get_test_data_dir() / REPORT_FILES["not_json"] + args = ["--location-file", str(file), "--profile", PROFILE] + with pytest.raises(ReportFileParseException): + LocationsReportRefactor(args) + + +def test_bad_locator_fails_as_expected(): + """ + An invalid locator should throw UnknownLocatorException + + :return: + """ + product_locator = "uid://this/is/b00000gus" + args = ["--product-locator", product_locator, "--profile", PROFILE] + with pytest.raises(UnknownLocatorException): + LocationsReportRefactor(args) + + +def test_missing_file_fails_as_expected(): + """ + If the specified location report file isn't there, + locations report should throw a FNF + + :return: + """ + file = get_test_data_dir() / "nonexistent.json" + args = ["--location-file", str(file), "--profile", PROFILE] + with pytest.raises(FileNotFoundError): + LocationsReportRefactor(args) + + +def test_file_returns_expected_report(): + """ + Can we get a valid locations report from a report file? + + :return: + """ + + file = get_test_data_dir() / REPORT_FILES["vla_eb"] + args = ["--location-file", str(file), "--profile", PROFILE] + report = LocationsReportRefactor(args) + assert len(report.servers_report) == 2 + assert len(report.files_report) == 2 + + +# TODO: other location report tests from test_df_function, test_df_return_codes