Skip to content
Snippets Groups Projects
Commit cababe8b authored by Daniel Lyons's avatar Daniel Lyons
Browse files

Revert "WS-179-divorce-archive-service"

This reverts commit 08dbc452.
parent d7ad4b38
No related branches found
No related tags found
1 merge request!167Revert "WS-179-divorce-archive-service"
Pipeline #1122 passed
""" A locations report is produced by the archive service; you give
it a product locator and it returns a dictionary of details on how
to retrieve the product's files from long term storage (NGAS): this
class handles fetching the report from the service or reading it from
a file, and has utilities to manipulate the report.
"""
# pylint: disable=C0301, E0401, E0402, W1203
import copy
import http
import json
import logging
from json import JSONDecodeError
from typing import Dict, List
import requests
from pycapo import CapoConfig
from .errors import (
LocationServiceTimeoutException,
LocationServiceRedirectsException,
LocationServiceErrorException,
NoLocatorException,
MissingSettingsException,
)
from .utilities import Cluster, RetrievalMode, validate_file_spec, get_arg_parser
# pylint: disable=C0103, R0902, R0903, R0914, W0703, W1203
_LOG = logging.getLogger(__name__)
logging.basicConfig(level=logging.DEBUG)
REQUIRED_SETTINGS = {
"EDU.NRAO.ARCHIVE.DATAFETCHER.DATAFETCHERSETTINGS.LOCATORSERVICEURLPREFIX": "locator_service_url",
"EDU.NRAO.ARCHIVE.DATAFETCHER.DATAFETCHERSETTINGS.EXECUTIONSITE": "execution_site",
}
class LocationsReportRefactor:
""" Builds a location report """
def __init__(self, args: List[str]):
namespace = get_arg_parser().parse_args(args)
try:
self.capo_config = CapoConfig(profile=namespace.profile)
except Exception as exc:
raise MissingSettingsException("Capo profile is required") from exc
self.product_locator = namespace.product_locator
self.location_file = namespace.location_file
self.profile = namespace.profile
if not self.product_locator and not self.location_file:
raise NoLocatorException("either product locator or report file must be specified")
self._run()
def _run(self):
self.files_report = self._get_files_report()
self.servers_report = self._get_servers_report()
def _get_files_report(self):
"""Given a product locator or a path to a location file, return a
location report: an object describing the files that make up the product
and where to get them from.
If neither argument is provided, throw a ValueError; if both are
(for some reason), then the location file takes precedence.
:return: location report (from file, in JSON)
"""
result = dict()
try:
if self.location_file is not None:
result = self._get_location_report_from_file()
if self.product_locator is not None:
result = self._get_location_report_from_service()
return self._add_retrieve_method_field(result)
except JSONDecodeError as js_err:
_LOG.error(f"Unable to parse {self.location_file}")
raise ReportFileParseException from js_err
def _get_location_report_from_file(self) -> Dict[str, str]:
"""
Read a file at a user-provided path to pull in the location report.
:return:
"""
_LOG.debug(f'fetching files from report "{self.location_file}"')
try:
with open(self.location_file) as to_read:
result = json.load(to_read)
return result
except FileNotFoundError as err:
_LOG.error(f"{err}")
raise
except JSONDecodeError as js_err:
_LOG.error(f"Unable to parse {self.location_file}")
raise ReportFileParseException from js_err
except Exception as exc:
_LOG.error(f"Problem getting location report from f{self.location_file}: {exc}")
raise
def _get_location_report_from_service(self):
"""Use 'requests' to fetch the location report from the locator service.
:return: location report (from locator service, in JSON)
"""
url = self.capo_config.get(
"edu.nrao.archive.datafetcher.DataFetcherSettings.locatorServiceUrlPrefix"
)
_LOG.debug(f"fetching report from {url} for {self.product_locator}")
# this is needed to prevent SSL errors when tests are run
# inside a Docker container
requests.packages.urllib3.util.ssl_.DEFAULT_CIPHERS += ":HIGH:!DH:!aNULL"
requests.Session().mount(url, adapter=requests.adapters.HTTPAdapter())
response = None
try:
response = requests.get(url, params={"locator": self.product_locator})
except requests.exceptions.Timeout as exc_to:
raise LocationServiceTimeoutException() from exc_to
except requests.exceptions.TooManyRedirects as exc_re:
raise LocationServiceRedirectsException() from exc_re
except requests.exceptions.RequestException as ex:
raise LocationServiceErrorException(ex) from ex
if response.status_code == http.HTTPStatus.OK:
return response.json()
if response.status_code == http.HTTPStatus.NOT_FOUND:
raise UnknownLocatorException(f'locator "{self.product_locator}" ' f"not found")
raise LocationServiceErrorException(
"locator service returned {}".format(response.status_code)
)
def _add_retrieve_method_field(self, files_report: Dict):
"""This adds a field to the files report about whether we can do
a direct copy or we have to rely on streaming: this is something
the location service itself doesn't know because it depends on
which site data-fetcher is running on, which site has the data and
whether
the NGAS cluster supports direct copy."""
dsoc_cluster = Cluster.DSOC
exec_site = self.capo_config.getstring(
"edu.nrao.archive.datafetcher.DataFetcherSettings.executionSite"
)
for file_spec in files_report["files"]:
validate_file_spec(file_spec, False)
server = file_spec["server"]
location = server["location"]
if server["cluster"] == dsoc_cluster.value and (
location == exec_site or location == str(exec_site)
):
server["retrieve_method"] = RetrievalMode.COPY
else:
server["retrieve_method"] = RetrievalMode.STREAM
return files_report
def _get_servers_report(self) -> Dict:
"""The location report we get back looks like this, for each file:
{"ngas_file_id":"17B-197_2018_02_19_T15_59_16.097.tar",
"subdirectory":"17B-197.sb34812522.eb35115211.58168.58572621528",
"relative_path":"17B-197_2018_02_19_T15_59_16.097.tar",
"checksum":"-1848259250",
"checksum_type":"ngamsGenCrc32",
"version":1,
"size":108677120,
"server":{"server":"nmngas01.aoc.nrao.edu:7777",
"location":"DSOC",
"cluster":"DSOC"
}}
Re-organize it to group files under servers so it is more useful.
"""
result = {}
for file_spec in self.files_report["files"]:
validate_file_spec(file_spec, True)
new_f = copy.deepcopy(file_spec)
del new_f["server"]
server = file_spec["server"]
server_host = server["server"]
if server_host not in result:
result[server_host] = dict()
result[server_host]["location"] = server["location"]
result[server_host]["cluster"] = server["cluster"]
result[server_host]["retrieve_method"] = server["retrieve_method"]
result[server_host]["files"] = list()
result[server_host]["files"].append(new_f)
return result
class ReportFileParseException(Exception):
"""Throw this when we're unable to parse a .json """
class UnknownLocatorException(Exception):
"""Throw this when we get a product locator we don't know how to handle"""
......@@ -7,7 +7,6 @@ import sys
from argparse import Namespace
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path
from pprint import pprint
from typing import Dict
from datafetcher.errors import NGASServiceErrorException
......@@ -133,34 +132,26 @@ class ParallelFetcher(BaseFetcher):
def fetch_bucket(self, bucket):
""" Grab the files in this bucket """
file_sizes = [file["size"] for file in bucket["files"]]
pprint(f"retrieving files {file_sizes} from {bucket['server']}")
self._LOG.debug(
f"{bucket['retrieve_method']} "
f"{len(bucket['files'])} files from "
f"{bucket['server']}...."
)
num_files = self.retrieve_files(
bucket["server"], bucket["retrieve_method"], bucket["files"]
)
print(f"done retrieving files {file_sizes} from {bucket['server']}")
return num_files
self.retrieve_files(bucket["server"], bucket["retrieve_method"], bucket["files"])
def run(self):
""" Fetch all the files for the product locator """
print(f"running {self.__class__.__name__}")
if self.args.dry_run:
self._LOG.debug("This is a dry run; files will not be fetched")
return 0
with ThreadPoolExecutor() as executor:
results = executor.map(self.fetch_bucket, self.bucketized_files)
print(f"results: {results}")
try:
for result in results:
print(f"result has arrived: {result}")
self.num_files_retrieved += result
futures = as_completed(results)
for future in futures:
self.num_files_retrieved += future.result()
if self.num_files_retrieved != self.num_files_expected:
self._LOG.error(
f"{self.num_files_expected} files expected, "
......@@ -169,7 +160,6 @@ class ParallelFetcher(BaseFetcher):
self._exit_with_error(ReturnCode.NGAS_FETCH_ERROR)
# successful retrieval
print("returning")
return 0
except (FileExistsError, NGASServiceErrorException) as exc:
self._LOG.error(f"{exc}")
......
""" Let's not actually hit the archive service """
import logging
import sys
from pathlib import Path
from datafetcher.locations_report_refactor import LocationsReportRefactor, UnknownLocatorException
# pylint: disable=C0301, R0903
_LOG = logging.getLogger(__name__)
logging.basicConfig(level=logging.DEBUG)
PROFILE = "docker"
REPORT_FILES = {
"vla_eb": "17A-109_fg_18468.json",
"calibration": "CALIBRATION.json",
"image": "IMG.json",
"vlba": "VLBA_EB.json",
"gbt": "AGBT17B_044_02.json",
# "alma": "A001_X1296_Xa93_RAW.json",
"empty": "EMPTY.json",
"not_json": "NOT_JSON.json",
"vla_bad_server": "VLA_BAD_SERVER.json",
"vla_eb_busted": "VLA_SMALL_EB_BUSTED.json",
}
class FakeArchiveService:
""" Stand-in for archive service """
def __init__(self, product_locator: str):
"""
Return locations report for given locator
:param product_locator:
"""
self.product_locator = product_locator
self.data_dir = get_test_data_dir()
def get_locations_report(self) -> LocationsReportRefactor:
"""
Depending on the product locator, return locations report for a VLA EB, an image, a VLBA product, etc.
:return: the location report
"""
if "vlba" in self.product_locator:
return self._vlba_locations_report()
elif "calibration" in self.product_locator:
return self._calibration_locations_report()
elif "image" in self.product_locator:
return self._image_locations_report()
elif "alma" in self.product_locator:
return self._alma_locations_report()
elif "gbt" in self.product_locator:
return self._gbt_locations_report()
elif "evla/execblock" in self.product_locator:
return self._vla_eb_locations_report()
raise UnknownLocatorException
def _vla_eb_locations_report(self) -> LocationsReportRefactor:
"""
Read in a VLA EB locations report from a .json file.
:return:
"""
file = self.data_dir / REPORT_FILES["vla_eb"]
return self._get_location_report_from_file(file)
def _calibration_locations_report(self) -> LocationsReportRefactor:
"""
Read in a VLA calibrations locations report from a .json file.
:return:
"""
file = self.data_dir / REPORT_FILES["calibration"]
return self._get_location_report_from_file(file)
def _image_locations_report(self) -> LocationsReportRefactor:
"""
Read in a VLASS image locations report from a .json file.
:return:
"""
file = self.data_dir / REPORT_FILES["image"]
return self._get_location_report_from_file(file)
def _vlba_locations_report(self) -> LocationsReportRefactor:
"""
Read in a VLBA locations report from a .json file.
:return:
"""
file = self.data_dir / REPORT_FILES["vlba"]
return self._get_location_report_from_file(file)
def _gbt_locations_report(self) -> LocationsReportRefactor:
"""
Read in a GBT locations report from a .json file.
:return:
"""
file = self.data_dir / REPORT_FILES["gbt"]
return self._get_location_report_from_file(file)
def _alma_locations_report(self) -> LocationsReportRefactor:
"""
Read in an ALMA EB locations report from a .json file.
:return:
"""
file = self.data_dir / REPORT_FILES["alma"]
return self._get_location_report_from_file(file)
@staticmethod
def _get_location_report_from_file(location_file: Path) -> LocationsReportRefactor:
"""
Read a .json file into a LocationsReport.
:param location_file: Path to file
:return: the locations report
"""
args = ["--location-file", str(location_file), "--profile", PROFILE]
return LocationsReportRefactor(args)
def get_test_data_dir() -> Path:
"""
Find the .json locations report files we use for testing.
:return:
"""
top_level_subdirs = sys.path
shared_ws_src = None
for pathname in top_level_subdirs:
# test data will be a few levels under shared_wksp
if "shared/workspaces" in pathname:
shared_ws_src = pathname
break
shared_wksp = Path(shared_ws_src).parent
for item in shared_wksp.rglob("location_files"):
assert item.is_dir()
return item
return None
""" Tests to confirm our fake archive service functions as intended """
# pylint: disable=E0402, W0511
from datafetcher.locations_report_refactor import LocationsReportRefactor
from .fake_archive_service import FakeArchiveService
# TODO: move this to appropriate DF test module
# def test_gets_df_settings():
# capo = CapoConfig(profile=PROFILE)
# key = "edu.nrao.archive.datafetcher.DataFetcherSettings"
# field = "locatorServiceUrlPrefix"
# to_get = key + "." + field
# try:
# url = capo.get(to_get)
# except KeyError as k_err:
# raise MissingSettingsException(f'missing required setting "{field}"') from k_err
# assert "locator" in url
#
#
def test_service_returns_vla_eb_locations():
"""
Does FakeArchiveService return the calibrations locations report in
test_data?
:return:
"""
found = False
report = FakeArchiveService("uid://evla/execblock/123a-456-b-789z").get_locations_report()
assert len(report.servers_report) == 2
assert len(report.files_report["files"]) == 79
expected_ngas_id = "17A-109_2017_02_11_T18_49_09.756.tar"
rel_path = None
for file_spec in report.files_report["files"]:
if file_spec["ngas_file_id"] == expected_ngas_id:
found = True
rel_path = file_spec["relative_path"]
break
assert found
assert rel_path == expected_ngas_id
def test_service_returns_vlba_locations():
"""
Does FakeArchiveService return the calibrations locations report in
test_data?
:return:
"""
report = FakeArchiveService("uid://evla/vlba/uuid-du-jour31").get_locations_report()
assert len(report.servers_report) == 1
assert len(report.files_report["files"]) == 16
expected_agg_size = 2140560000
actual_agg_size = report.files_report["aggregate_size"]
assert actual_agg_size == expected_agg_size
def test_service_returns_cal_locations():
"""
Does FakeArchiveService return the calibrations locations report in
test_data?
:return:
"""
found = False
report = FakeArchiveService("uid://evla/calibration/different-uuid").get_locations_report()
assert len(report.servers_report) == 1
assert len(report.files_report) == 2
expected_subdir = "18B-265_2019_12_10_T00_00_59.203.tar"
for file_spec in report.files_report["files"]:
if file_spec["subdirectory"] == expected_subdir:
found = True
break
assert found
def test_service_returns_image_locations():
"""
Does FakeArchiveService return the image locations report in test_data?
:return:
"""
report = FakeArchiveService("uid://evla/image/some-uuid").get_locations_report()
assert isinstance(report, LocationsReportRefactor)
assert len(report.servers_report) == 2
assert len(report.files_report) == 2
expected_file_id = "uid____evla_image_56a10be7-f1c2-4788-8651-6ecc5bfbc2f1.fits"
found = False
for file_spec in report.files_report["files"]:
if file_spec["ngas_file_id"] == expected_file_id:
found = True
break
assert found
""" Unit tests for LocationsReportRefactor """
# pylint: disable=W0511, E0401, E0402
import pytest
from datafetcher.locations_report_refactor import (
LocationsReportRefactor,
ReportFileParseException,
UnknownLocatorException,
)
from .fake_archive_service import REPORT_FILES, get_test_data_dir
PROFILE = "docker"
def test_service_returns_expected_report():
"""
Does FakeArchiveService return a valid locations report?
:return:
"""
product_locator = "uid://evla/image/7a546de2-ab1f-4915-a8dc-94f572f9215c"
args = ["--product-locator", product_locator, "--profile", PROFILE]
report = LocationsReportRefactor(args)
assert report.files_report is not None
assert len(report.servers_report) == 2
assert len(report.files_report) == 2
def test_empty_file_fails_as_expected():
"""
When we attempt to create a locations report from a blank
.json file, do we get the expected error?
:return:
"""
file = get_test_data_dir() / REPORT_FILES["empty"]
args = ["--location-file", str(file), "--profile", PROFILE]
with pytest.raises(ReportFileParseException):
LocationsReportRefactor(args)
def test_bad_json_fails_as_expected():
"""
When we attempt to create a locations report from a badly-formed
.json file, do we get the expected error?
:return:
"""
file = get_test_data_dir() / REPORT_FILES["not_json"]
args = ["--location-file", str(file), "--profile", PROFILE]
with pytest.raises(ReportFileParseException):
LocationsReportRefactor(args)
def test_bad_locator_fails_as_expected():
"""
An invalid locator should throw UnknownLocatorException
:return:
"""
product_locator = "uid://this/is/b00000gus"
args = ["--product-locator", product_locator, "--profile", PROFILE]
with pytest.raises(UnknownLocatorException):
LocationsReportRefactor(args)
def test_missing_file_fails_as_expected():
"""
If the specified location report file isn't there,
locations report should throw a FNF
:return:
"""
file = get_test_data_dir() / "nonexistent.json"
args = ["--location-file", str(file), "--profile", PROFILE]
with pytest.raises(FileNotFoundError):
LocationsReportRefactor(args)
def test_file_returns_expected_report():
"""
Can we get a valid locations report from a report file?
:return:
"""
file = get_test_data_dir() / REPORT_FILES["vla_eb"]
args = ["--location-file", str(file), "--profile", PROFILE]
report = LocationsReportRefactor(args)
assert len(report.servers_report) == 2
assert len(report.files_report) == 2
# TODO: other location report tests from test_df_function, test_df_return_codes
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment