Skip to content
Snippets Groups Projects

fixing download capability

Merged Charlotte Hausman requested to merge notification_service_work into main
6 files
+ 679
0
Compare changes
  • Side-by-side
  • Inline
Files
6
""" A locations report is produced by the archive service; you give
it a product locator and it returns a dictionary of details on how
to retrieve the product's files from long term storage (NGAS): this
class handles fetching the report from the service or reading it from
a file, and has utilities to manipulate the report.
"""
# pylint: disable=C0301, E0401, E0402, W1203
import copy
import http
import json
import logging
from json import JSONDecodeError
from typing import Dict, List
import requests
from pycapo import CapoConfig
from .errors import (
LocationServiceTimeoutException,
LocationServiceRedirectsException,
LocationServiceErrorException,
NoLocatorException,
MissingSettingsException,
)
from .utilities import Cluster, RetrievalMode, validate_file_spec, get_arg_parser
# pylint: disable=C0103, R0902, R0903, R0914, W0703, W1203
logger = logging.getLogger(__name__)
REQUIRED_SETTINGS = {
"EDU.NRAO.ARCHIVE.DATAFETCHER.DATAFETCHERSETTINGS.LOCATORSERVICEURLPREFIX": "locator_service_url",
"EDU.NRAO.ARCHIVE.DATAFETCHER.DATAFETCHERSETTINGS.EXECUTIONSITE": "execution_site",
}
class LocationsReportRefactor:
""" Builds a location report """
def __init__(self, args: List[str]):
namespace = get_arg_parser().parse_args(args)
try:
self.capo_config = CapoConfig(profile=namespace.profile)
except Exception as exc:
raise MissingSettingsException("Capo profile is required") from exc
self.product_locator = namespace.product_locator
self.location_file = namespace.location_file
self.profile = namespace.profile
if not self.product_locator and not self.location_file:
raise NoLocatorException("either product locator or report file must be specified")
self._run()
def _run(self):
self.files_report = self._get_files_report()
self.servers_report = self._get_servers_report()
def _get_files_report(self):
"""Given a product locator or a path to a location file, return a
location report: an object describing the files that make up the product
and where to get them from.
If neither argument is provided, throw a ValueError; if both are
(for some reason), then the location file takes precedence.
:return: location report (from file, in JSON)
"""
result = dict()
try:
if self.location_file is not None:
result = self._get_location_report_from_file()
if self.product_locator is not None:
result = self._get_location_report_from_service()
return self._add_retrieve_method_field(result)
except JSONDecodeError as js_err:
logger.error(f"Unable to parse {self.location_file}")
raise ReportFileParseException from js_err
def _get_location_report_from_file(self) -> Dict[str, str]:
"""
Read a file at a user-provided path to pull in the location report.
:return:
"""
logger.debug(f'fetching files from report "{self.location_file}"')
try:
with open(self.location_file) as to_read:
result = json.load(to_read)
return result
except FileNotFoundError as err:
logger.error(f"{err}")
raise
except JSONDecodeError as js_err:
logger.error(f"Unable to parse {self.location_file}")
raise ReportFileParseException from js_err
except Exception as exc:
logger.error(f"Problem getting location report from f{self.location_file}: {exc}")
raise
def _get_location_report_from_service(self):
"""Use 'requests' to fetch the location report from the locator service.
:return: location report (from locator service, in JSON)
"""
url = self.capo_config.get(
"edu.nrao.archive.datafetcher.DataFetcherSettings.locatorServiceUrlPrefix"
)
logger.debug(f"fetching report from {url} for {self.product_locator}")
# this is needed to prevent SSL errors when tests are run
# inside a Docker container
requests.packages.urllib3.util.ssl_.DEFAULT_CIPHERS += ":HIGH:!DH:!aNULL"
requests.Session().mount(url, adapter=requests.adapters.HTTPAdapter())
response = None
try:
response = requests.get(url, params={"locator": self.product_locator})
except requests.exceptions.Timeout as exc_to:
raise LocationServiceTimeoutException() from exc_to
except requests.exceptions.TooManyRedirects as exc_re:
raise LocationServiceRedirectsException() from exc_re
except requests.exceptions.RequestException as ex:
raise LocationServiceErrorException(ex) from ex
except Exception as exc:
logger.error(f"{exc}")
if response.status_code == http.HTTPStatus.OK:
return response.json()
if response.status_code == http.HTTPStatus.NOT_FOUND:
raise UnknownLocatorException(f'locator "{self.product_locator}" ' f"not found")
raise LocationServiceErrorException(
"locator service returned {}".format(response.status_code)
)
def _add_retrieve_method_field(self, files_report: Dict):
"""This adds a field to the files report about whether we can do
a direct copy or we have to rely on streaming: this is something
the location service itself doesn't know because it depends on
which site data-fetcher is running on, which site has the data and
whether
the NGAS cluster supports direct copy."""
dsoc_cluster = Cluster.DSOC
exec_site = self.capo_config.getstring(
"edu.nrao.archive.datafetcher.DataFetcherSettings.executionSite"
)
for file_spec in files_report["files"]:
validate_file_spec(file_spec, False)
server = file_spec["server"]
location = server["location"]
if server["cluster"] == dsoc_cluster.value and (
location == exec_site or location == str(exec_site)
):
server["retrieve_method"] = RetrievalMode.COPY
else:
server["retrieve_method"] = RetrievalMode.STREAM
return files_report
def _get_servers_report(self) -> Dict:
"""The location report we get back looks like this, for each file:
{"ngas_file_id":"17B-197_2018_02_19_T15_59_16.097.tar",
"subdirectory":"17B-197.sb34812522.eb35115211.58168.58572621528",
"relative_path":"17B-197_2018_02_19_T15_59_16.097.tar",
"checksum":"-1848259250",
"checksum_type":"ngamsGenCrc32",
"version":1,
"size":108677120,
"server":{"server":"nmngas01.aoc.nrao.edu:7777",
"location":"DSOC",
"cluster":"DSOC"
}}
Re-organize it to group files under servers so it is more useful.
"""
result = {}
for file_spec in self.files_report["files"]:
validate_file_spec(file_spec, True)
new_f = copy.deepcopy(file_spec)
del new_f["server"]
server = file_spec["server"]
server_host = server["server"]
if server_host not in result:
result[server_host] = dict()
result[server_host]["location"] = server["location"]
result[server_host]["cluster"] = server["cluster"]
result[server_host]["retrieve_method"] = server["retrieve_method"]
result[server_host]["files"] = list()
result[server_host]["files"].append(new_f)
return result
class ReportFileParseException(Exception):
"""Throw this when we're unable to parse a .json """
class UnknownLocatorException(Exception):
"""Throw this when we get a product locator we don't know how to handle"""
Loading