Skip to content
Snippets Groups Projects
Commit 42d6af88 authored by Daniel Lyons's avatar Daniel Lyons
Browse files

Replace old calls in e2e tests with modern ones, remove old modules

parent 4089166f
No related branches found
No related tags found
1 merge request!247Replace old calls in e2e tests with modern ones, remove old modules
Pipeline #1711 waiting for manual action
""" Custom error definitions for data-fetcher """
import logging
_LOG = logging.getLogger(__name__)
# pylint: disable=W1202
errors = [
(1, "NoProfileException", "no CAPO profile provided"),
(2, "MissingSettingsException", "missing required setting"),
(
3,
"LocationServiceTimeoutException",
"request to locator service timed out",
),
(
4,
"LocationServiceRedirectsException",
"too many redirects on locator service",
),
(
5,
"LocationServiceErrorException",
"catastrophic error on locator service",
),
(6, "NoLocatorException", "product locator not found"),
(7, "FileErrorException", "not able to open specified location file"),
(8, "NGASServiceTimeoutException", "request to NGAS timed out"),
(
9,
"NGASServiceRedirectsException",
"too many redirects on NGAS service",
),
(10, "NGASServiceErrorException", "catastrophic error on NGAS service"),
(11, "SizeMismatchException", "retrieved file not expected size"),
(12, "FileExistsError", "specified location file exists"),
(13, "FileNotFoundError", "target directory or file not found"),
(14, "NGASFetchError", "trouble retrieving files from NGAS"),
]
class DataFetcherException(Exception):
code: int
message: str
def define_datafetcher_exception(code, name, message):
result = type(name, (DataFetcherException,), {})
result.code, result.message = code, message
return result
# Define the exceptions
NoProfileException = define_datafetcher_exception(*errors[0])
MissingSettingsException = define_datafetcher_exception(*errors[1])
LocationServiceTimeoutException = define_datafetcher_exception(*errors[2])
LocationServiceRedirectsException = define_datafetcher_exception(*errors[3])
LocationServiceErrorException = define_datafetcher_exception(*errors[4])
NoLocatorException = define_datafetcher_exception(*errors[5])
FileErrorException = define_datafetcher_exception(*errors[6])
NGASServiceTimeoutException = define_datafetcher_exception(*errors[7])
NGASServiceRedirectsException = define_datafetcher_exception(*errors[8])
NGASServiceErrorException = define_datafetcher_exception(*errors[9])
SizeMismatchException = define_datafetcher_exception(*errors[10])
FileExistsError = define_datafetcher_exception(*errors[11])
FileNotFoundError = define_datafetcher_exception(*errors[12])
NGASFetchError = define_datafetcher_exception(*errors[13])
TERMINAL_ERROR_CODES = "Return Codes:\n"
for error_code, exception_name, message in errors:
TERMINAL_ERROR_CODES += f"\t{error_code}: {message}\n"
# -*- coding: utf-8 -*-
"""
This is a home for assorted utilities used in the datafetcher,
its modules, and its tests.
"""
import json
from enum import Enum
import logging
import os
import pathlib
import time
from typing import Callable, Dict
# pylint:disable=C0301, C0303, C0415, E0401, E0402, R0903, W0212, W1202, W0404, W0621, W1203
from pycapo import CapoConfig
from productfetcher.errors import (
NoProfileException,
MissingSettingsException,
NGASServiceErrorException,
SizeMismatchException,
)
LOG_FORMAT = "%(module)s.%(funcName)s, %(lineno)d: %(message)s"
MAX_TRIES = 10
SLEEP_INTERVAL_SECONDS = 1
FILE_SPEC_KEYS = [
"ngas_file_id",
"subdirectory",
"relative_path",
"checksum",
"checksum_type",
"version",
"size",
"server",
]
SERVER_SPEC_KEYS = ["server", "location", "cluster", "retrieve_method"]
# Prologue and epilogue for the command line parser.
_PROLOGUE = """Retrieve a product (a science product or an ancillary product)
from the NRAO archive, either by specifying the product's locator or by
providing the path to a product locator report."""
# This is a dictionary of required CAPO settings
# and the attribute names we'll store them as.
REQUIRED_SETTINGS = {
"EDU.NRAO.ARCHIVE.DATAFETCHER.DATAFETCHERSETTINGS.LOCATORSERVICEURLPREFIX": "locator_service_url",
"EDU.NRAO.ARCHIVE.DATAFETCHER.DATAFETCHERSETTINGS.EXECUTIONSITE": "execution_site",
"EDU.NRAO.ARCHIVE.DATAFETCHER.DATAFETCHERSETTINGS.DEFAULTTHREADSPERHOST": "threads_per_host",
"EDU.NRAO.ARCHIVE.WORKFLOW.CONFIG.REQUESTHANDLERSETTINGS.DOWNLOADDIRECTORY": "download_dir",
}
logger = logging.getLogger(__name__)
def path_is_accessible(path: pathlib.Path) -> bool:
"""
Is this path readable, executable, and writable?
:param path: a file or directory
:return:
"""
if not path.exists():
return False
can_access = os.access(path, os.F_OK)
can_access = can_access and path.is_dir()
can_access = can_access and os.access(path, os.R_OK)
can_access = can_access and os.access(path, os.W_OK)
can_access = can_access and os.access(path, os.X_OK)
return can_access
def get_capo_settings() -> Dict[str, str]:
"""
Get the required CAPO settings for datafetcher for the provided profile
(prod, test, etc). Throws exception if it can't find one of them.
:return: a bunch of settings
"""
profile = os.environ["CAPO_PROFILE"]
if not profile:
raise NoProfileException("CAPO_PROFILE required; none provided")
capo = CapoConfig(profile=profile)
result = {}
for setting in REQUIRED_SETTINGS:
setting = setting.upper()
try:
result[REQUIRED_SETTINGS[setting]] = capo[setting]
except KeyError as k_err:
raise MissingSettingsException(
f'missing required setting "{setting}" with profile "{profile}"'
) from k_err
return result
def get_file_info_from_json_file(json_file: pathlib.Path) -> Dict:
"""
Plumb the JSON out of this locations file.
:param json_file: file to read
:return:
"""
with open(json_file, "r") as content:
file_info = json.loads(content.read())
return file_info
class Retryer:
"""
Retry executing a function, or die trying
"""
def __init__(self, func: Callable, max_tries: int, sleep_interval: int):
self.func = func
self.num_tries = 0
self.max_tries = max_tries
self.sleep_interval = sleep_interval
self._logger = logger
self.complete = False
def retry(self, *args):
"""
Try something a specified number of times.
Die if it doesn't work after N tries.
:param args:
:return:
"""
while self.num_tries < self.max_tries and not self.complete:
self.num_tries += 1
exc = None
try:
success = self.func(args)
if success:
self.complete = True
else:
if self.num_tries < self.max_tries:
self._logger.debug(
"iteration #{}: {}; trying again after {} "
"seconds....".format(self.num_tries, exc, self.sleep_interval)
)
time.sleep(self.sleep_interval)
else:
raise NGASServiceErrorException(
"FAILURE after {} attempts".format(self.num_tries)
)
except (NGASServiceErrorException, SizeMismatchException) as exc:
if self.num_tries < self.max_tries:
self._logger.debug(
"{}; trying again after {} seconds....".format(exc, self.sleep_interval)
)
time.sleep(self.sleep_interval)
else:
self._logger.error("FAILURE after {} attempts".format(self.num_tries))
raise exc
class Location(Enum):
"""
Where the files live
"""
DSOC = "DSOC"
NAASC = "NAASC"
def __str__(self):
return str(self.value)
class Cluster(Enum):
"""
Which cluster the files are on
"""
DSOC = "DSOC"
NAASC = "NAASC"
def __str__(self):
return str(self.value)
class ExecutionSite(Enum):
"""Where this code is executing"""
DSOC = "DSOC"
NAASC = "NAASC"
def __str__(self):
if self.value:
return str(self.value)
return "local_test"
class RetrievalMode(Enum):
"""How we're retrieving a file: via streaming or via direct copy (plugin)"""
STREAM = "stream"
COPY = "copy"
class CLIParam(Enum):
"""Codifies datafetcher's various command-line parameters"""
SPL = "--product-locator"
FILE = "--location-file"
DRY = "--dry-run"
FORCE = "--force"
DIRECT_COPY = "--direct-copy"
STREAMING = "--streaming"
LUSTRE = "lustre"
ASDM_BIN_SUBDIR = "ASDMBinary"
......@@ -15,11 +15,7 @@ from unittest.mock import patch
import pytest
from productfetcher.interfaces import LocationReport
from productfetcher.locations import LocationReportSchema
from productfetcher.utilities import (
get_file_info_from_json_file,
ASDM_BIN_SUBDIR,
)
from productfetcher.locations import FileLocator
logger = logging.getLogger(__name__)
......@@ -28,6 +24,8 @@ logger.addHandler(logging.StreamHandler(sys.stdout))
REALLY_FETCH = False
ASDM_BIN_SUBDIR = "ASDMBinary"
# TODO: test optional command-line parameters?
......@@ -238,8 +236,7 @@ def test_fetch_vla_eb_from_locator(tmpdir, resource_path_root):
"""
locations_file = resource_path_root / "location_files" / "13B-014.json"
locations_dict = get_file_info_from_json_file(locations_file)
locations_rpt = LocationReportSchema().load(locations_dict)
locations_rpt = FileLocator(locations_file).locate()
cmd_line_args = ["productfetcher", "--product-locator", "im_a_little_teapot"]
output_dir = Path(tmpdir)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment