Skip to content
Snippets Groups Projects
Commit b1c5a8b0 authored by Charlotte Hausman's avatar Charlotte Hausman
Browse files

forgot to walk back the tests too

parent 3032923d
No related branches found
No related tags found
1 merge request!188fixing download capability
Pipeline #1307 passed
""" A locations report is produced by the archive service; you give
it a product locator and it returns a dictionary of details on how
to retrieve the product's files from long term storage (NGAS): this
class handles fetching the report from the service or reading it from
a file, and has utilities to manipulate the report.
"""
# pylint: disable=C0301, E0401, E0402, W1203
import copy
import http
import json
import logging
from json import JSONDecodeError
from typing import Dict, List
import requests
from pycapo import CapoConfig
from .errors import (
LocationServiceTimeoutException,
LocationServiceRedirectsException,
LocationServiceErrorException,
NoLocatorException,
MissingSettingsException,
)
from .utilities import Cluster, RetrievalMode, validate_file_spec, get_arg_parser
# pylint: disable=C0103, R0902, R0903, R0914, W0703, W1203
logger = logging.getLogger(__name__)
REQUIRED_SETTINGS = {
"EDU.NRAO.ARCHIVE.DATAFETCHER.DATAFETCHERSETTINGS.LOCATORSERVICEURLPREFIX": "locator_service_url",
"EDU.NRAO.ARCHIVE.DATAFETCHER.DATAFETCHERSETTINGS.EXECUTIONSITE": "execution_site",
}
class LocationsReportRefactor:
""" Builds a location report """
def __init__(self, args: List[str]):
namespace = get_arg_parser().parse_args(args)
try:
self.capo_config = CapoConfig(profile=namespace.profile)
except Exception as exc:
raise MissingSettingsException("Capo profile is required") from exc
self.product_locator = namespace.product_locator
self.location_file = namespace.location_file
self.profile = namespace.profile
if not self.product_locator and not self.location_file:
raise NoLocatorException("either product locator or report file must be specified")
self._run()
def _run(self):
self.files_report = self._get_files_report()
self.servers_report = self._get_servers_report()
def _get_files_report(self):
"""Given a product locator or a path to a location file, return a
location report: an object describing the files that make up the product
and where to get them from.
If neither argument is provided, throw a ValueError; if both are
(for some reason), then the location file takes precedence.
:return: location report (from file, in JSON)
"""
result = dict()
try:
if self.location_file is not None:
result = self._get_location_report_from_file()
if self.product_locator is not None:
result = self._get_location_report_from_service()
return self._add_retrieve_method_field(result)
except JSONDecodeError as js_err:
logger.error(f"Unable to parse {self.location_file}")
raise ReportFileParseException from js_err
def _get_location_report_from_file(self) -> Dict[str, str]:
"""
Read a file at a user-provided path to pull in the location report.
:return:
"""
logger.debug(f'fetching files from report "{self.location_file}"')
try:
with open(self.location_file) as to_read:
result = json.load(to_read)
return result
except FileNotFoundError as err:
logger.error(f"{err}")
raise
except JSONDecodeError as js_err:
logger.error(f"Unable to parse {self.location_file}")
raise ReportFileParseException from js_err
except Exception as exc:
logger.error(f"Problem getting location report from f{self.location_file}: {exc}")
raise
def _get_location_report_from_service(self):
"""Use 'requests' to fetch the location report from the locator service.
:return: location report (from locator service, in JSON)
"""
url = self.capo_config.get(
"edu.nrao.archive.datafetcher.DataFetcherSettings.locatorServiceUrlPrefix"
)
logger.debug(f"fetching report from {url} for {self.product_locator}")
# this is needed to prevent SSL errors when tests are run
# inside a Docker container
requests.packages.urllib3.util.ssl_.DEFAULT_CIPHERS += ":HIGH:!DH:!aNULL"
requests.Session().mount(url, adapter=requests.adapters.HTTPAdapter())
response = None
try:
response = requests.get(url, params={"locator": self.product_locator})
except requests.exceptions.Timeout as exc_to:
raise LocationServiceTimeoutException() from exc_to
except requests.exceptions.TooManyRedirects as exc_re:
raise LocationServiceRedirectsException() from exc_re
except requests.exceptions.RequestException as ex:
raise LocationServiceErrorException(ex) from ex
except Exception as exc:
logger.error(f"{exc}")
if response.status_code == http.HTTPStatus.OK:
return response.json()
if response.status_code == http.HTTPStatus.NOT_FOUND:
raise UnknownLocatorException(f'locator "{self.product_locator}" ' f"not found")
raise LocationServiceErrorException(
"locator service returned {}".format(response.status_code)
)
def _add_retrieve_method_field(self, files_report: Dict):
"""This adds a field to the files report about whether we can do
a direct copy or we have to rely on streaming: this is something
the location service itself doesn't know because it depends on
which site data-fetcher is running on, which site has the data and
whether
the NGAS cluster supports direct copy."""
dsoc_cluster = Cluster.DSOC
exec_site = self.capo_config.getstring(
"edu.nrao.archive.datafetcher.DataFetcherSettings.executionSite"
)
for file_spec in files_report["files"]:
validate_file_spec(file_spec, False)
server = file_spec["server"]
location = server["location"]
if server["cluster"] == dsoc_cluster.value and (
location == exec_site or location == str(exec_site)
):
server["retrieve_method"] = RetrievalMode.COPY
else:
server["retrieve_method"] = RetrievalMode.STREAM
return files_report
def _get_servers_report(self) -> Dict:
"""The location report we get back looks like this, for each file:
{"ngas_file_id":"17B-197_2018_02_19_T15_59_16.097.tar",
"subdirectory":"17B-197.sb34812522.eb35115211.58168.58572621528",
"relative_path":"17B-197_2018_02_19_T15_59_16.097.tar",
"checksum":"-1848259250",
"checksum_type":"ngamsGenCrc32",
"version":1,
"size":108677120,
"server":{"server":"nmngas01.aoc.nrao.edu:7777",
"location":"DSOC",
"cluster":"DSOC"
}}
Re-organize it to group files under servers so it is more useful.
"""
result = {}
for file_spec in self.files_report["files"]:
validate_file_spec(file_spec, True)
new_f = copy.deepcopy(file_spec)
del new_f["server"]
server = file_spec["server"]
server_host = server["server"]
if server_host not in result:
result[server_host] = dict()
result[server_host]["location"] = server["location"]
result[server_host]["cluster"] = server["cluster"]
result[server_host]["retrieve_method"] = server["retrieve_method"]
result[server_host]["files"] = list()
result[server_host]["files"].append(new_f)
return result
class ReportFileParseException(Exception):
"""Throw this when we're unable to parse a .json """
class UnknownLocatorException(Exception):
"""Throw this when we get a product locator we don't know how to handle"""
""" Utilities for passing arguments to datafetcher in tests """
import os
from pathlib import Path
from typing import List
from datafetcher.errors import MissingSettingsException, NoProfileException
from datafetcher.utilities import get_arg_parser, path_is_accessible
def parse_args(args: List[str]):
"""
Wrapper around parser.parse_args() so in the event of a foo
an exception is thrown rather than sys.exe
:param args:
:return:
"""
confirm_complete_args(args)
get_arg_parser().parse_args(args)
def confirm_complete_args(args: List[str]):
"""
Let's scrutinize the args -before- calling parse_args()
so we can differentiate among errors.
:param args:
:return:
"""
# we must have a profile
assert "--profile" in args or "CAPO_PROFILE" in os.environ
if "--profile" not in args and "CAPO_PROFILE" in os.environ:
raise NoProfileException("Capo profile is required.")
# we must have an output dir....
assert "--output-dir" in args
# ... and it must exist
args_iter = iter(args)
args_dict = dict(zip(args_iter, args_iter))
output_dir_arg = args_dict["--output-dir"]
output_dir = Path(output_dir_arg)
if not output_dir.exists():
# did they just forget to specify it?
if output_dir_arg.startswith("--"):
raise MissingSettingsException("--output-dir is required")
raise FileNotFoundError(f"output dir '{output_dir_arg}' not found")
if not path_is_accessible(output_dir):
raise FileNotFoundError(f"permission denied on {output_dir}")
# we must have EITHER a product locator OR a locations file....
if "--product-locator" not in args_dict.keys() and "--location-file" not in args_dict.keys():
raise MissingSettingsException("either product locator or locations file required")
if "--product-locator" in args_dict.keys() and "--location-file" in args_dict.keys():
raise MissingSettingsException("product locator OR locations file required -- not both")
if "--product-locator" in args:
product_locator = args_dict["--product-locator"]
assert product_locator
else:
locations_file = args_dict["--location-file"]
if not Path(locations_file).exists():
raise FileNotFoundError(f"{locations_file} not found")
""" Let's not actually hit the archive service """
import logging
import sys
from pathlib import Path
from datafetcher.locations_report_refactor import LocationsReportRefactor, UnknownLocatorException
# pylint: disable=C0301, R0903
_LOG = logging.getLogger(__name__)
PROFILE = "docker"
REPORT_FILES = {
"vla_eb": "17A-109_fg_18468.json",
"calibration": "CALIBRATION.json",
"image": "IMG.json",
"vlba": "VLBA_EB.json",
"gbt": "AGBT17B_044_02.json",
# "alma": "A001_X1296_Xa93_RAW.json",
"empty": "EMPTY.json",
"not_json": "NOT_JSON.json",
"vla_bad_server": "VLA_BAD_SERVER.json",
"vla_eb_busted": "VLA_SMALL_EB_BUSTED.json",
}
class FakeArchiveService:
""" Stand-in for archive service """
def __init__(self, product_locator: str):
"""
Return locations report for given locator
:param product_locator:
"""
self.product_locator = product_locator
self.data_dir = get_test_data_dir()
def get_locations_report(self) -> LocationsReportRefactor:
"""
Depending on the product locator, return locations report for a VLA EB, an image, a VLBA product, etc.
:return: the location report
"""
if "vlba" in self.product_locator:
return self._vlba_locations_report()
elif "calibration" in self.product_locator:
return self._calibration_locations_report()
elif "image" in self.product_locator:
return self._image_locations_report()
elif "alma" in self.product_locator:
return self._alma_locations_report()
elif "gbt" in self.product_locator:
return self._gbt_locations_report()
elif "evla/execblock" in self.product_locator:
return self._vla_eb_locations_report()
raise UnknownLocatorException
def _vla_eb_locations_report(self) -> LocationsReportRefactor:
"""
Read in a VLA EB locations report from a .json file.
:return:
"""
file = self.data_dir / REPORT_FILES["vla_eb"]
return self._get_location_report_from_file(file)
def _calibration_locations_report(self) -> LocationsReportRefactor:
"""
Read in a VLA calibrations locations report from a .json file.
:return:
"""
file = self.data_dir / REPORT_FILES["calibration"]
return self._get_location_report_from_file(file)
def _image_locations_report(self) -> LocationsReportRefactor:
"""
Read in a VLASS image locations report from a .json file.
:return:
"""
file = self.data_dir / REPORT_FILES["image"]
return self._get_location_report_from_file(file)
def _vlba_locations_report(self) -> LocationsReportRefactor:
"""
Read in a VLBA locations report from a .json file.
:return:
"""
file = self.data_dir / REPORT_FILES["vlba"]
return self._get_location_report_from_file(file)
def _gbt_locations_report(self) -> LocationsReportRefactor:
"""
Read in a GBT locations report from a .json file.
:return:
"""
file = self.data_dir / REPORT_FILES["gbt"]
return self._get_location_report_from_file(file)
def _alma_locations_report(self) -> LocationsReportRefactor:
"""
Read in an ALMA EB locations report from a .json file.
:return:
"""
file = self.data_dir / REPORT_FILES["alma"]
return self._get_location_report_from_file(file)
@staticmethod
def _get_location_report_from_file(location_file: Path) -> LocationsReportRefactor:
"""
Read a .json file into a LocationsReport.
:param location_file: Path to file
:return: the locations report
"""
args = ["--location-file", str(location_file), "--profile", PROFILE]
return LocationsReportRefactor(args)
def get_test_data_dir() -> Path:
"""
Find the .json locations report files we use for testing.
:return:
"""
top_level_subdirs = sys.path
shared_ws_src = None
for pathname in top_level_subdirs:
# test data will be a few levels under shared_wksp
if "shared/workspaces" in pathname:
shared_ws_src = pathname
break
shared_wksp = Path(shared_ws_src).parent
for item in shared_wksp.rglob("location_files"):
assert item.is_dir()
return item
return None
""" for testing the attempt to copy rather than stream files """
import logging
from argparse import Namespace
# pylint: disable=C0103, C0301, E0401, E0402, R0201, R0902, R0903, W0621
from datafetcher.locations_report import LocationsReport
from datafetcher.project_fetcher import ParallelFetcher
from datafetcher.utilities import get_capo_settings, ExecutionSite
from datafetcher.errors import MissingSettingsException
from .df_pytest_utils import TEST_PROFILE
logger = logging.getLogger(__name__)
class MockProdDataFetcher:
""" Creates and launches a datafetcher using the dsoc-prod profile """
def __init__(self, args: Namespace, settings: dict):
if args is None or settings is None:
raise MissingSettingsException()
self.args = args
self.settings = settings
self.output_dir = args.output_dir
self.profile = args.profile
self.locations_report = self._get_locations()
self.servers_report = self.locations_report.servers_report
def _get_locations(self):
"""
Create a locations report with DSOC as exec site
to force copy rather than stream
:return:
"""
capo_settings = get_capo_settings(TEST_PROFILE)
capo_settings["execution_site"] = ExecutionSite.DSOC.value
return LocationsReport(self.args, capo_settings)
def run(self):
"""
identical to DataFetcher.run()
:return:
"""
return ParallelFetcher(self.args, self.settings, self.servers_report).run()
""" Tests to confirm our fake archive service functions as intended """
# pylint: disable=E0402, W0511
from datafetcher.locations_report_refactor import LocationsReportRefactor
from .fake_archive_service import FakeArchiveService
# TODO: move this to appropriate DF test module
# def test_gets_df_settings():
# capo = CapoConfig(profile=PROFILE)
# key = "edu.nrao.archive.datafetcher.DataFetcherSettings"
# field = "locatorServiceUrlPrefix"
# to_get = key + "." + field
# try:
# url = capo.get(to_get)
# except KeyError as k_err:
# raise MissingSettingsException(f'missing required setting "{field}"') from k_err
# assert "locator" in url
#
#
def test_service_returns_vla_eb_locations():
"""
Does FakeArchiveService return the calibrations locations report in
test_data?
:return:
"""
found = False
report = FakeArchiveService("uid://evla/execblock/123a-456-b-789z").get_locations_report()
assert len(report.servers_report) == 2
assert len(report.files_report["files"]) == 79
expected_ngas_id = "17A-109_2017_02_11_T18_49_09.756.tar"
rel_path = None
for file_spec in report.files_report["files"]:
if file_spec["ngas_file_id"] == expected_ngas_id:
found = True
rel_path = file_spec["relative_path"]
break
assert found
assert rel_path == expected_ngas_id
def test_service_returns_vlba_locations():
"""
Does FakeArchiveService return the calibrations locations report in
test_data?
:return:
"""
report = FakeArchiveService("uid://evla/vlba/uuid-du-jour31").get_locations_report()
assert len(report.servers_report) == 1
assert len(report.files_report["files"]) == 16
expected_agg_size = 2140560000
actual_agg_size = report.files_report["aggregate_size"]
assert actual_agg_size == expected_agg_size
def test_service_returns_cal_locations():
"""
Does FakeArchiveService return the calibrations locations report in
test_data?
:return:
"""
found = False
report = FakeArchiveService("uid://evla/calibration/different-uuid").get_locations_report()
assert len(report.servers_report) == 1
assert len(report.files_report) == 2
expected_subdir = "18B-265_2019_12_10_T00_00_59.203.tar"
for file_spec in report.files_report["files"]:
if file_spec["subdirectory"] == expected_subdir:
found = True
break
assert found
def test_service_returns_image_locations():
"""
Does FakeArchiveService return the image locations report in test_data?
:return:
"""
report = FakeArchiveService("uid://evla/image/some-uuid").get_locations_report()
assert isinstance(report, LocationsReportRefactor)
assert len(report.servers_report) == 2
assert len(report.files_report) == 2
expected_file_id = "uid____evla_image_56a10be7-f1c2-4788-8651-6ecc5bfbc2f1.fits"
found = False
for file_spec in report.files_report["files"]:
if file_spec["ngas_file_id"] == expected_file_id:
found = True
break
assert found
""" Unit tests for LocationsReportRefactor """
# pylint: disable=W0511, E0401, E0402
import pytest
from datafetcher.locations_report_refactor import (
LocationsReportRefactor,
ReportFileParseException,
UnknownLocatorException,
)
from .fake_archive_service import REPORT_FILES, get_test_data_dir
PROFILE = "docker"
def test_service_returns_expected_report():
"""
Does FakeArchiveService return a valid locations report?
:return:
"""
product_locator = "uid://evla/image/7a546de2-ab1f-4915-a8dc-94f572f9215c"
args = ["--product-locator", product_locator, "--profile", PROFILE]
report = LocationsReportRefactor(args)
assert report.files_report is not None
assert len(report.servers_report) == 2
assert len(report.files_report) == 2
def test_empty_file_fails_as_expected():
"""
When we attempt to create a locations report from a blank
.json file, do we get the expected error?
:return:
"""
file = get_test_data_dir() / REPORT_FILES["empty"]
args = ["--location-file", str(file), "--profile", PROFILE]
with pytest.raises(ReportFileParseException):
LocationsReportRefactor(args)
def test_bad_json_fails_as_expected():
"""
When we attempt to create a locations report from a badly-formed
.json file, do we get the expected error?
:return:
"""
file = get_test_data_dir() / REPORT_FILES["not_json"]
args = ["--location-file", str(file), "--profile", PROFILE]
with pytest.raises(ReportFileParseException):
LocationsReportRefactor(args)
def test_bad_locator_fails_as_expected():
"""
An invalid locator should throw UnknownLocatorException
:return:
"""
product_locator = "uid://this/is/b00000gus"
args = ["--product-locator", product_locator, "--profile", PROFILE]
with pytest.raises(UnknownLocatorException):
LocationsReportRefactor(args)
def test_missing_file_fails_as_expected():
"""
If the specified location report file isn't there,
locations report should throw a FNF
:return:
"""
file = get_test_data_dir() / "nonexistent.json"
args = ["--location-file", str(file), "--profile", PROFILE]
with pytest.raises(FileNotFoundError):
LocationsReportRefactor(args)
def test_file_returns_expected_report():
"""
Can we get a valid locations report from a report file?
:return:
"""
file = get_test_data_dir() / REPORT_FILES["vla_eb"]
args = ["--location-file", str(file), "--profile", PROFILE]
report = LocationsReportRefactor(args)
assert len(report.servers_report) == 2
assert len(report.files_report) == 2
# TODO: other location report tests from test_df_function, test_df_return_codes
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment