diff --git a/apps/cli/executables/datafetcher/README.md b/apps/cli/executables/datafetcher/README.md index b2cc894843867d2fa2ab8c1a76de9c55036411ac..3a46f7229fcbd412dd9c08992c40dc741641268e 100644 --- a/apps/cli/executables/datafetcher/README.md +++ b/apps/cli/executables/datafetcher/README.md @@ -15,17 +15,23 @@ This is intended to be a library wrapped in a command line interface. usage: datafetcher [-h] (--product-locator PRODUCT_LOCATOR | --location-file LOCATION_FILE) [--dry-run] [--output-dir OUTPUT_DIR] [--verbose] - [--profile PROFILE] + [--profile PROFILE] [--force] Retrieve a product (a science product or an ancillary product) from the NRAO archive, either by specifying the product's locator or by providing the path to a product locator report. +If any of the files to be retrieved is already present in the target location, +do not overwrite it unless the --force argument is supplied; display an informative +error message. + Optional Arguments: --dry-run dry run, do not fetch product --output-dir OUTPUT_DIR output directory, default current directory --verbose make a lot of noise + --force if a file to fetched exists in the target location, + overwrite it --profile PROFILE CAPO profile to use Return Codes: diff --git a/apps/cli/executables/datafetcher/src/datafetcher/commands.py b/apps/cli/executables/datafetcher/src/datafetcher/commands.py deleted file mode 100755 index e8107449bcd41d079fee396db992f7c36f69f57f..0000000000000000000000000000000000000000 --- a/apps/cli/executables/datafetcher/src/datafetcher/commands.py +++ /dev/null @@ -1,116 +0,0 @@ -#!/usr/bin/env/python -# -*- coding: utf-8 -*- - -""" Module for the command line interface to data-fetcher. """ - -import logging -import sys -import traceback - -from .errors import NoLocatorException, \ - NGASServiceErrorException, exception_to_error, terminal_exception -from .locations_report import LocationsReport -from .product_fetchers import ParallelProductFetcher -from .utilities import get_arg_parser, get_capo_settings, FlexLogger - - -class DataFetcher: - ''' - TO EXECUTE ALL DF TESTS: from datafetcher/, just run pytest - - example command line that should work with the correct local profile: - datafetcher --profile local --output-dir ~/Downloads/ - --product-locator \ - uid://evla/execblock/93e1c0cd-76de-4e65-a3f2-d5fe55f386d8 \ - --verbose - - local.properties must have: - - edu.nrao.archive.workflow.config - .StartupSettings.temporaryDataDirectory pointing to a locally - writable temp dir, e.g., /var/tmp - - edu.nrao.archive.workflow.config.DeliverySettings.hostname must point - to local computer - - execution_site must NOT be DSOC or NAASC - - ''' - - def __init__(self, args, settings): - self.args = args - self.settings = settings - - verbose = args and args.verbose - try: - self._LOG = FlexLogger(self.__class__.__name__, args.output_dir, verbose) - self.logfile = self._LOG.logfile - self.locations_report = self._get_locations() - self.servers_report = self.locations_report.servers_report - except (NoLocatorException, FileNotFoundError, PermissionError) as exc: - self._terminal_exception(exc) - except TypeError as exc: - self._LOG.error('TODO: handle TypeError') - self._terminal_exception(exc) - except Exception as exc: - self._LOG.error( - f'>>> throwing unexpected {type(exc)} during init: {exc}') - self._terminal_exception(exc) - - def run(self): - """ - launch the fetcher - :return: - """ - - try: - return ParallelProductFetcher( - self.args, self.settings, self._LOG, - self.servers_report).run() - except (NGASServiceErrorException, FileExistsError) as exc: - self._terminal_exception(exc) - except AttributeError as a_err: - self._LOG.error(f'>>> throwing AttributeError during run: {a_err}') - self._terminal_exception(a_err) - except Exception as exc: - self._LOG.error( - f'>>> throwing unexpected exception during run: {exc}') - self._terminal_exception(exc) - - def _get_locations(self): - try: - return LocationsReport(self._LOG, self.args, self.settings) - except NoLocatorException as exc: - self._terminal_exception(exc) - - def _terminal_exception(self, exception: Exception): - ''' report exception, then throw in the towel - ''' - errorno = exception_to_error(exception) - try: - self._LOG.debug(traceback.format_exc()) - exc_type = type(exception) - self._LOG.error('terminal_exception') - self._LOG.error(f'{exc_type}: {str(exception)}') - except Exception as exc: - logging.error(exc) - finally: - sys.exit(errorno.value) - - -def main(): - ''' this will be executed when fetcher is launched - from the command line - ''' - try: - parser = get_arg_parser() - args = parser.parse_args() - settings = get_capo_settings(args.profile) - datafetcher = DataFetcher(args, settings) - datafetcher.run() - except (NGASServiceErrorException, FileExistsError) as ex: - terminal_exception(ex) - except Exception as ex: - logging.error(f'>>> some other kind of exception during main: {ex}') - terminal_exception(ex) - - -if __name__ == '__main__': - main() diff --git a/apps/cli/executables/datafetcher/src/datafetcher/datafetcher.py b/apps/cli/executables/datafetcher/src/datafetcher/datafetcher.py new file mode 100755 index 0000000000000000000000000000000000000000..06f9cfdd50eafc82007f7e4b006c19483f5937f3 --- /dev/null +++ b/apps/cli/executables/datafetcher/src/datafetcher/datafetcher.py @@ -0,0 +1,178 @@ +#!/usr/bin/env/python +# -*- coding: utf-8 -*- + +''' Module for the command line interface to data-fetcher. ''' + +import logging +import sys +from argparse import Namespace +from pathlib import Path + +from datafetcher.project_fetcher import ParallelFetcher +from datafetcher.return_codes import ReturnCode + +from .locations_report import LocationsReport +from .utilities import get_arg_parser, get_capo_settings, FlexLogger, \ + path_is_accessible + +# pylint: disable=W1203, R0902, R0903 + +_APPLICATION_NAME = 'datafetcher' +_VALID_PROFILES = ['nmtest', 'dsoc-test', 'dsoc-dev', + 'nmprod', 'dsoc-prod', 'local'] + +class DataFetcherRefactor: + ''' + TO EXECUTE ALL DF TESTS: from datafetcher/, just run pytest + + example command line that should work with the correct local profile + and a current product locator (these are changed whenever the + database is rebuilt: + datafetcher --profile local --output-dir ~/Downloads/ + --product-locator \ + uid://evla/execblock/93e1c0cd-76de-4e65-a3f2-d5fe55f386d8 \ + --verbose + + local.properties must have: + - edu.nrao.archive.workflow.config + .StartupSettings.temporaryDataDirectory pointing to a locally + writable temp dir, e.g., /var/tmp + - edu.nrao.archive.workflow.config.DeliverySettings.hostname must point + to local computer + - execution_site must NOT be DSOC or NAASC + + ''' + + def __init__(self, args: Namespace, settings: dict): + self.usage = self._build_usage_message() + if args is None or settings is None: + self._exit_with_error(ReturnCode.MISSING_SETTING) + self.args = args + self.settings = settings + self.verbose = self.args.verbose + + # required arguments + self.output_dir = args.output_dir + if self.output_dir is None: + self._exit_with_error(ReturnCode.MISSING_SETTING) + + output_dir = Path(self.output_dir) + if not output_dir.is_dir()\ + or not path_is_accessible(output_dir): + logging.error(f'output location {self.output_dir} inaccessible ' + f'or not found') + self._exit_with_error(ReturnCode.MISSING_SETTING) + + self._LOG = FlexLogger(self.__class__.__name__, + self.output_dir, self.verbose) + if args.location_file is not None: + if args.product_locator is not None: + self._LOG.error('required: location file OR product locator ' + '-- not both') + self._exit_with_error(ReturnCode.MISSING_SETTING) + self.location_file = args.location_file + elif args.product_locator is not None: + self.product_locator = args.product_locator + else: + self._LOG.error('you must specify either a location file or a ' + 'product locator') + self._exit_with_error(ReturnCode.MISSING_SETTING) + + self.profile = args.profile + if self.profile is None or not self.profile in _VALID_PROFILES: + self._exit_with_error(ReturnCode.MISSING_PROFILE) + + # optional arguments + self.is_dry = args.dry_run + self.force = args.force + self.verbose = args.verbose or False + + try: + self.locations_report = self._get_locations() + self.servers_report = self.locations_report.servers_report + except SystemExit as exc: + self._LOG.error(f'{exc}') + if args.location_file: + self._exit_with_error(ReturnCode.CANNOT_OPEN_LOCATION_FILE) + else: + self._exit_with_error(ReturnCode.PRODUCT_LOCATOR_NOT_FOUND) + raise + + except Exception as exc: + self._LOG.error( + f'>>> throwing unexpected {type(exc)} during init: {exc}') + raise + + def _exit_with_error(self, return_code: ReturnCode): + print(self.usage) + sys.exit(return_code.value) + + + @staticmethod + def _build_usage_message() -> str: + usage_str = 'Usage:' + usage_str += """\tdatafetcher [-h] + (--product-locator PRODUCT_LOCATOR | --location-file LOCATION_FILE) + [--dry-run] [--output-dir OUTPUT_DIR] [-v, --verbose] + [--profile PROFILE]\n""" + usage_str += "\n\t\tReturn codes:\n" + for return_code in ReturnCode: + usage_str += f"\n\t\t\t{return_code.value['code']}: " \ + f"{return_code.value['text']}" + return usage_str + + def run(self): + """ + launch the fetcher + :return: + """ + + try: + return ParallelFetcher( + self.args, self.settings, self._LOG, + self.servers_report).run() + except SystemExit as exc: + self._LOG.error(f'{exc}') + raise + except Exception as exc: + self._LOG.error( + f'>>> throwing unexpected exception during run: {exc}') + raise + + def _get_locations(self): + try: + capo_settings = get_capo_settings(self.profile) + return LocationsReport(self._LOG, + self.args, + capo_settings) + except Exception as exc: + self._LOG.error(f'{exc}') + if hasattr(self, 'location_file'): + self._exit_with_error(ReturnCode.CANNOT_OPEN_LOCATION_FILE) + elif hasattr(self, 'product_locator'): + self._exit_with_error(ReturnCode.PRODUCT_LOCATOR_NOT_FOUND) + else: + # should never happen; we've already checked + self._exit_with_error(ReturnCode.MISSING_SETTING) + + +def main(): + ''' this will be executed when fetcher is launched + from the command line + ''' + + parser = get_arg_parser() + try: + args = parser.parse_args() + except Exception as exc: + logging.error(f'{exc}') + return exc.value + except SystemExit as exc: + return exc.value.code + settings = get_capo_settings(args.profile) + datafetcher = DataFetcherRefactor(args, settings) + return datafetcher.run() + + +if __name__ == '__main__': + main() diff --git a/apps/cli/executables/datafetcher/src/datafetcher/locations_report.py b/apps/cli/executables/datafetcher/src/datafetcher/locations_report.py index 5788dc67ba404c4e26816d3397e9e3cc30d94cd8..39bfacc6e46f3f8d3c1a0c6e48494f2e0b4ccbdd 100644 --- a/apps/cli/executables/datafetcher/src/datafetcher/locations_report.py +++ b/apps/cli/executables/datafetcher/src/datafetcher/locations_report.py @@ -126,10 +126,6 @@ class LocationsReport: :return: location report (from file, in JSON) """ result = dict() - if self.product_locator is None and self.location_file is None: - raise ValueError( - 'product_locator or location_file must be provided; ' - 'neither were') if self.location_file is not None: result = self._get_location_report_from_file() if self.product_locator is not None: @@ -143,9 +139,13 @@ class LocationsReport: self._LOG.debug(f'fetching files from report "{self.location_file}"') # try: - with open(self.location_file) as to_read: - result = json.load(to_read) - return result + try: + with open(self.location_file) as to_read: + result = json.load(to_read) + return result + except FileNotFoundError as err: + self._LOG.error(f'{err}') + raise def _get_location_report_from_service(self): """ Use 'requests' to fetch the location report from the locator service. @@ -171,6 +171,8 @@ class LocationsReport: raise LocationServiceRedirectsException() except requests.exceptions.RequestException as ex: raise LocationServiceErrorException(ex) + except Exception as exc: + self._LOG.error(f'{exc}') if response.status_code == http.HTTPStatus.OK: return response.json() diff --git a/apps/cli/executables/datafetcher/src/datafetcher/product_fetchers.py b/apps/cli/executables/datafetcher/src/datafetcher/project_fetcher.py similarity index 68% rename from apps/cli/executables/datafetcher/src/datafetcher/product_fetchers.py rename to apps/cli/executables/datafetcher/src/datafetcher/project_fetcher.py index f3f30841581f398ac849f95c60fd5094d50cdaed..f453da9b6ea33d447f3883adcbf1890c9f820223 100644 --- a/apps/cli/executables/datafetcher/src/datafetcher/product_fetchers.py +++ b/apps/cli/executables/datafetcher/src/datafetcher/project_fetcher.py @@ -3,18 +3,20 @@ """ Implementations of assorted product fetchers """ import copy -import os +import sys from argparse import Namespace from concurrent.futures import ThreadPoolExecutor, as_completed from pathlib import Path from typing import Dict -from .errors import NGASServiceErrorException +from datafetcher.errors import NGASServiceErrorException +from datafetcher.return_codes import ReturnCode + from .file_retrievers import NGASFileRetriever from .utilities import FlexLogger -class BaseProductFetcher: +class BaseFetcher: """ This is a base class for fetchers. """ def __init__(self, args: Namespace, settings: Dict, logger: FlexLogger, @@ -31,8 +33,8 @@ class BaseProductFetcher: self.num_files_retrieved = 0 def retrieve_files(self, server, retrieve_method, file_specs): - ''' this is the part where we actually fetch the files - ''' + ''' this is the part where we actually fetch the files ''' + retriever = NGASFileRetriever(self.args, self._LOG) num_files = len(file_specs) count = 0 @@ -49,18 +51,18 @@ class BaseProductFetcher: return num_files -class SerialProductFetcher(BaseProductFetcher): - """ Pull the files out, one right after another; - don't try to be clever about it. - """ - # TODO some fine day: add datafetcher_tests for this IFF it will be used in - # production +class SerialFetcher(BaseFetcher): + ''' Pull the files out, one right after another; + don't try to be clever about it. + + ''' def __init__(self, args: Namespace, settings: Dict, logger: FlexLogger, servers_report: Dict): super().__init__(args, settings, logger, servers_report) def run(self): + ''' fetch 'em ''' self._LOG.debug('writing to {}'.format(self.output_dir)) self._LOG.debug('dry run: {}'.format(self.dry_run)) self._LOG.debug(f'force overwrite: {self.force_overwrite}') @@ -70,25 +72,24 @@ class SerialProductFetcher(BaseProductFetcher): self.servers_report[server]['files']) -class ParallelProductFetcher(BaseProductFetcher): - """ Pull the files out in parallel; try to be clever about it. - """ - +class ParallelFetcher(BaseFetcher): + ''' Pull the files out in parallel; try to be clever about it. ''' - def __init__(self, args: Namespace, settings: Dict, logger: FlexLogger, - servers_report: Dict): + def __init__(self, args: Namespace, settings: dict, logger: FlexLogger, + servers_report: dict): super().__init__(args, settings, logger, servers_report) self.num_files_expected = self._count_files_expected() self.bucketized_files = self._bucketize_files() def _count_files_expected(self): + ''' determine how many files we expect to retrieve ''' count = 0 for server in self.servers_report: count += len(self.servers_report[server]['files']) return count def _bucketize_files(self): - """ Takes the servers report and splits it up into a list of buckets, + ''' Takes the servers report and splits it up into a list of buckets, each of which a single thread will handle. There will be X * Y buckets, where X is the number of servers and Y is the 'threads per host', and the files for a given server will be distributed among the buckets @@ -97,8 +98,9 @@ class ParallelProductFetcher(BaseProductFetcher): Basically what we are doing here is splitting up the work among the threads we'll be creating and creating a list of work for each thread to do. - """ + ''' + threads_per_host = int(self.settings['threads_per_host']) result = list() for server in self.servers_report: # Setup the 'buckets', one per server. @@ -106,12 +108,11 @@ class ParallelProductFetcher(BaseProductFetcher): 'retrieve_method': self.servers_report[server]['retrieve_method'], 'files': list()} - buckets = [copy.deepcopy(bucket) for x in - range(int(self.settings['threads_per_host']))] + buckets = [copy.deepcopy(bucket) for _ in range(threads_per_host)] # Spread the files for a given server around its buckets. i = 0 for file_spec in self.servers_report[server]['files']: - list_number = i % int(self.settings['threads_per_host']) + list_number = i % threads_per_host buckets[list_number]['files'].append(file_spec) i += 1 # Trim out every bucket with no files, add the rest to the result. @@ -132,37 +133,48 @@ class ParallelProductFetcher(BaseProductFetcher): def run(self): ''' Fetch all the files ''' + if self.args.dry_run: + self._LOG.debug('This is a dry run; files will not be fetched') + return 0 + with ThreadPoolExecutor() as executor: results = executor.map(self.fetch_bucket, self.bucketized_files) try: - for future in as_completed(results): + futures = as_completed(results) + for future in futures: self.num_files_retrieved += future.result() if self.num_files_retrieved != self.num_files_expected: self._LOG.error( f'{self.num_files_expected} files expected, ' f'but only {self.num_files_retrieved} retrieved') - raise NGASServiceErrorException - return self.retrieved - except (FileExistsError, NGASServiceErrorException) as n_exc: - raise n_exc + self._exit_with_error(ReturnCode.NGAS_FETCH_ERROR) + + # successful retrieval + return 0 + except (FileExistsError, NGASServiceErrorException) as exc: + self._LOG.error(f'{exc}') + self._exit_with_error(ReturnCode.NGAS_FETCH_ERROR) except AttributeError: - # This error may -- but doesn't always -- occur after all files - # actually -have- been retrieved. TODO some fine day: why? - for dirname, dirnames, _ in os.walk( - self.args.output_dir): - if dirnames: - # we can expect one subdir: the external_name associated - # with the product locator - to_walk = Path(dirname) / dirnames[0] - for dname, dnames, files in os.walk(to_walk): - if self.num_files_expected <= len(files): - self.num_files_retrieved += len(files) - break - if self.num_files_expected >= self.num_files_retrieved: - break - if self.num_files_retrieved < self.num_files_expected: - raise NGASServiceErrorException( + # (This error sometimes gets thrown after all files + # actually -have- been retrieved. I blame the NGAS API.) + + output_path = Path(self.args.output_dir) + files = [file for file in output_path.rglob('*') + if not file.is_dir() + and not str(file).endswith('.json') + and not str(file).endswith('.log') + ] + if len(files) < self.num_files_expected: + self._LOG.error( f'{self.num_files_expected} files expected, but only ' f'{self.num_files_retrieved} found') + self._exit_with_error(ReturnCode.CATASTROPHIC_REQUEST_ERROR) + + return 0 + + except Exception as exc: + self._LOG.error(f'{exc}') + self._exit_with_error(ReturnCode.NGAS_FETCH_ERROR) - return self.retrieved + def _exit_with_error(self, return_code: ReturnCode): + sys.exit(return_code.value['code']) diff --git a/apps/cli/executables/datafetcher/src/datafetcher/return_codes.py b/apps/cli/executables/datafetcher/src/datafetcher/return_codes.py new file mode 100644 index 0000000000000000000000000000000000000000..4e4629863a5f426b4e0b7e34fa7a9016de79659d --- /dev/null +++ b/apps/cli/executables/datafetcher/src/datafetcher/return_codes.py @@ -0,0 +1,17 @@ +''' data-fetcher return codes as specified in README and usage string ''' + +from enum import Enum + + +class ReturnCode(Enum): + SUCCESS = {'code': 0, 'text' : 'success'} + MISSING_PROFILE = {'code': 1, 'text' : 'no CAPO profile provided'} + MISSING_SETTING = {'code': 2, 'text' : 'missing required setting'} + LOCATOR_SERVICE_TIMEOUT = {'code': 3, 'text' : 'request to locator service timed out'} + TOO_MANY_SERVICE_REDIRECTS = {'code': 4, 'text' : 'too many redirects on locator service'} + CATASTROPHIC_REQUEST_ERROR = {'code': 5, 'text' : 'catastrophic error on request service'} + PRODUCT_LOCATOR_NOT_FOUND = {'code': 6, 'text' : 'product locator not found'} + CANNOT_OPEN_LOCATION_FILE = {'code': 7, 'text' : 'not able to open specified location file'} + NGAS_FETCH_ERROR = {'code': 8, 'text' : 'error fetching file from NGAS server'} + SIZE_MISMATCH = {'code': 9, 'text' : 'retrieved file not expected size'} + diff --git a/apps/cli/executables/datafetcher/src/datafetcher/utilities.py b/apps/cli/executables/datafetcher/src/datafetcher/utilities.py index 880f0cedab2cde8047ce26e5f31e6dee8495af01..b79863d613b325cdef8677fd9b1e9e6376dad602 100644 --- a/apps/cli/executables/datafetcher/src/datafetcher/utilities.py +++ b/apps/cli/executables/datafetcher/src/datafetcher/utilities.py @@ -194,7 +194,7 @@ class FlexLogger(): ''' if class_name is None: raise MissingSettingsException('class name is required') - log_pathname = f'{class_name}_{str(time())}.log' + log_pathname = f'{output_dir}/{class_name}_{str(time())}.log' try: self.logfile = pathlib.Path(output_dir, log_pathname) diff --git a/apps/cli/executables/datafetcher/test/test_datafetcher.py b/apps/cli/executables/datafetcher/test/test_datafetcher.py index 73cb4e551b8072fc848aa0137b8efa018d3e3d07..2a27eb41fdee42da6b566f0f5c8089ad6cf49efd 100644 --- a/apps/cli/executables/datafetcher/test/test_datafetcher.py +++ b/apps/cli/executables/datafetcher/test/test_datafetcher.py @@ -1,701 +1,757 @@ -""" datafetcher unit tests """ - +''' Unit tests for data-fetcher. ''' +import logging import os -import subprocess import tempfile from pathlib import Path -from typing import List -from unittest.mock import MagicMock import pytest +from datafetcher.datafetcher import DataFetcherRefactor, ReturnCode +from datafetcher.utilities import get_arg_parser, ProductLocatorLookup, \ + ExecutionSite, RetrievalMode, Location, Cluster +from pycapo import CapoConfig -from datafetcher.commands import DataFetcher -from datafetcher.errors import Errors -from datafetcher.locations_report import LocationsReport -from datafetcher.utilities import get_arg_parser, ExecutionSite, \ - RetrievalMode, FlexLogger, ProductLocatorLookup - -from .testing_utils import TEST_PROFILE, LOCATION_REPORTS, \ - get_locations_report, get_locations_file, \ - get_mini_locations_file, find_newest_fetch_log_file, get_test_data_dir, \ - get_metadata_db_settings, get_test_capo_settings +from .testing_utils import get_test_capo_settings, get_metadata_db_settings, \ + TEST_PROFILE, get_mini_locations_file, get_locations_file, LOCATION_REPORTS -_VLA_SMALL_KEY = 'VLA_SMALL_EB' -_FETCH_COMMAND = 'datafetcher' _LOCATION_FILENAME = 'locations.json' -_EB_EXTERNAL_NAME = 'sysstartS.58955.83384832176' _ASDM_XML = 'ASDM.xml' +_EB_EXTERNAL_NAME = 'sysstartS.58955.83384832176' -''' -TO EXECUTE THESE TESTS: from apps/cli/executables/datafetcher, - - pytest -vx --logging-level=INFO - -''' - -class TestDataFetcher: - """ IMPORTANT NOTE: we CANNOT retrieve by copy if we don't have access to a - location to which NGAS can write, e.g, lustre. Therefore, any test - that involves -actual- retrieval of files must be by streaming, to - ensure which we must use a Capo profile in which the execution site is - -not- DSOC or NAASC. - The reason is this algorithm used in LocationsReport: - - for f in files_report['files']: - if f['server']['cluster'] == Cluster.DSOC and \ - f['server']['location'] == self.settings['execution_site']: - f['server']['retrieve_method'] = RetrievalMode.COPY - else: - f['server']['retrieve_method'] = RetrievalMode.STREAM - - - Be sure to have on the test system a local profile (local.properties) - that meets these criteria: - - - edu.nrao.archive.workflow.config.StartupSettings.temporaryDataDirectory - pointing to a locally writable temp dir, e.g., /var/tmp - - edu.nrao.archive.workflow.config.DeliverySettings.hostname - must point to local computer - - execution_site must NOT be DSOC or NAASC - - - """ - - @pytest.fixture(autouse=True, scope='function') - def setup_settings_datadir(self) -> None: - self.settings = get_test_capo_settings() - self.db_settings = get_metadata_db_settings(TEST_PROFILE) - self.test_data = self._initialize_test_data() - self.DATA_DIR = get_test_data_dir() - if self.DATA_DIR is None: - pytest.fail(f'test data directory not found under {os.getcwd()}') - - @pytest.fixture(autouse=True, scope='function') - def make_tempdir(self) -> None: - umask = os.umask(0o000) - self.top_level = tempfile.mkdtemp() - os.umask(umask) - self._LOG = FlexLogger(__name__, self.top_level) - - def test_bad_command_line(self): - - # bad product locator - args = [_FETCH_COMMAND, - '--product-locator', 'not-even', - '--profile', TEST_PROFILE, '--output-dir', self.top_level] - fetcher = CommandLineFetchLauncher(args, self._LOG) - try: - fetcher.run() - except FileNotFoundError as err: - self._LOG.debug(f'>>> {err}') - raise err - - exception_found = False - terminal_exception_thrown = False - - bad_locator_logfile = find_newest_fetch_log_file(self.top_level) - assert bad_locator_logfile is not None - assert 0 != os.path.getsize(bad_locator_logfile) - with open(bad_locator_logfile) as log: - log_contents = log.readlines() - - for line in log_contents: - if 'NoLocatorException' in line: - exception_found = True - if 'terminal_exception' in line: - terminal_exception_thrown = True - if exception_found and terminal_exception_thrown: - break - assert exception_found - assert terminal_exception_thrown - bad_locator_logfile.unlink() - - # nonexistent locations file - args = [_FETCH_COMMAND, '--location-file', 'aint_got_one', '--output-dir', - self.top_level, '--profile', TEST_PROFILE] - - fetcher = CommandLineFetchLauncher(args, self._LOG) - fetcher.run() - logfile = find_newest_fetch_log_file(self.top_level) - with open(logfile, 'r') as log: - log_contents = log.readlines() - - exception_found = False - terminal_exception_thrown = False - for line in log_contents: - if 'FileNotFoundError' in line: - exception_found = True - if 'terminal_exception' in line: - terminal_exception_thrown = True - if exception_found and terminal_exception_thrown: - break - assert exception_found - - def test_nothing_retrieved_if_dry_on_cmd_line(self): - toplevel = Path(self.top_level) - location_file = get_mini_locations_file( - Path(toplevel, _LOCATION_FILENAME)) - args = [_FETCH_COMMAND, - '--location-file', str(location_file), - '--profile', TEST_PROFILE, '--output-dir', self.top_level, - '--dry', '--verbose'] - fetcher = CommandLineFetchLauncher(args, self._LOG) - output = fetcher.run() - logfile = find_newest_fetch_log_file(self.top_level) - assert [] == output - assert 0 != os.path.getsize(logfile) - Path.unlink(location_file) - - # make sure none of these files written - file_count = 0 - for _ in os.walk(location_file): - file_count += 1 - assert 0 == file_count - - def test_force_overwrite_from_cmd_line(self): - toplevel = Path(self.top_level) - location_file = get_mini_locations_file(toplevel / _LOCATION_FILENAME) - dest_dir = Path(toplevel, _EB_EXTERNAL_NAME) - dest_dir.mkdir(parents=True, exist_ok=True) - - # make a fake file to be overwritten - fake_file = dest_dir / _ASDM_XML - with open(fake_file, 'w') as to_write: - to_write.write('alas, my days are numbered') - args = [_FETCH_COMMAND, - '--location-file', str(location_file), - '--profile', TEST_PROFILE, - '--output-dir', self.top_level, - '--force'] - CommandLineFetchLauncher(args, self._LOG).run() - - sizes = dict() - for _, _, fnames in os.walk(dest_dir): - for fname in fnames: - path = dest_dir / fname - sizes[path] = os.path.getsize(path) - assert 37 == len(sizes) - fake_size = os.path.getsize(fake_file) - assert 9339 == fake_size - - def test_no_overwrite_from_cmd_line(self): - toplevel = Path(self.top_level) - location_file = get_mini_locations_file(toplevel / _LOCATION_FILENAME) - dest_dir = toplevel / _EB_EXTERNAL_NAME - dest_dir.mkdir(parents=True, exist_ok=True) - - # make a fake file that shouldn't be overwritten - fake_file = dest_dir / _ASDM_XML - with open(fake_file, 'w') as to_write: - to_write.write("I'm not going anywhere!") - args = [_FETCH_COMMAND, - '--location-file', str(location_file), - '--profile', TEST_PROFILE, '--output-dir', self.top_level] - fetcher = CommandLineFetchLauncher(args, self._LOG) - fetcher.run() - - term_except_found = False - file_exists_found = False - logfile = find_newest_fetch_log_file(self.top_level) - with open(logfile, 'r') as log: - log_contents = log.readlines() - for line in log_contents: - if 'terminal_exception' in line: - term_except_found = True - if 'FileExistsError' in line: - file_exists_found = True - if term_except_found and file_exists_found: - break - - assert term_except_found and file_exists_found - - def test_cmd_line_more_output_when_verbose(self): - report_file = get_mini_locations_file( - Path(self.top_level, 'locations_verbose.json')) - args = [_FETCH_COMMAND, - '--location-file', str(report_file), - '--profile', TEST_PROFILE, '--output-dir', self.top_level, - '--verbose'] - fetcher = CommandLineFetchLauncher(args, self._LOG) - retrieved = fetcher.run() - num_files_expected = 37 - assert num_files_expected == len(retrieved) - - verbose_logfile = find_newest_fetch_log_file(self.top_level) - assert 0 != os.path.getsize(verbose_logfile) - - [file.unlink() for file in retrieved] - verbose_logfile.unlink() - - # same thing, but without verbose - args = [_FETCH_COMMAND, - '--location-file', str(report_file), - '--profile', TEST_PROFILE, '--output-dir', self.top_level] - fetcher = CommandLineFetchLauncher(args, self._LOG) - retrieved = fetcher.run() - assert num_files_expected == len(retrieved) - logfile = find_newest_fetch_log_file(self.top_level) - assert 0 == os.path.getsize(logfile) - - def test_can_stream_from_mini_locations_file(self): - """ gin up a location report with just a few small files in it - and confirm that we can actually stream them - """ - location_file = get_mini_locations_file(Path(self.top_level, _LOCATION_FILENAME)) - - report_file = get_mini_locations_file(location_file) - args = ['--location-file', str(report_file), - '--output-dir', self.top_level, - '--profile', TEST_PROFILE] - namespace = get_arg_parser().parse_args(args) - fetch = DataFetcher(namespace, self.settings) - retrieved = fetch.run() - file_count = len(retrieved) - assert 37 == file_count - - def test_verbose_writes_stuff_to_log(self): - path = Path(self.top_level, _LOCATION_FILENAME) - report_file = get_mini_locations_file(path) - args = ['--location-file', str(report_file), - '--output-dir', self.top_level, - '--profile', TEST_PROFILE, '--verbose'] - namespace = get_arg_parser().parse_args(args) - fetch = DataFetcher(namespace, self.settings) - fetch.run() - - logfile = fetch.logfile - assert logfile.is_file() - assert 0 != os.path.getsize(logfile) - - def test_empty_log_if_not_verbose(self): - path = Path(self.top_level, _LOCATION_FILENAME) - report_file = get_mini_locations_file(path) - args = ['--location-file', str(report_file), - '--output-dir', self.top_level, - '--profile', TEST_PROFILE] +# set this to False when debugging one or more tests +# so as not to have to sit thru every test; +# comment out the target test(s)' @pytest.skip +RUN_ALL = True + +# pylint: disable=C0115, C0116, R0902, R0903, R0914, W1203, W0613, W0621 + +@pytest.fixture(autouse=True, scope='function') +def make_tempdir() -> None: + ''' Creates a new temporary working directory for each test. ''' + umask = os.umask(0o000) + top_level = tempfile.mkdtemp(prefix='datafetcher_test_', dir='/var/tmp') + os.umask(umask) + yield top_level + +@pytest.fixture(scope='module') +def capo_settings(): + ''' get Capo settings once for whole module ''' + + def retrieve_capo_settings() -> CapoConfig: + return get_test_capo_settings() + + capo_settings = retrieve_capo_settings() + yield capo_settings + +@pytest.fixture(scope='module') +def settings(capo_settings): + ''' grabs all the settings we will need for the datafetcher: + Capo, database, test data + ''' + db_settings = get_metadata_db_settings(TEST_PROFILE) + test_data = _initialize_test_data(db_settings) + yield Settings(capo_settings, db_settings, test_data) + +def launch_datafetcher(args, capo_settings): + ''' invoke the DF with these args as in df.main(), + launch it with df.run(), + and return the appropriate return/error code + + ''' + + try: namespace = get_arg_parser().parse_args(args) - fetch = DataFetcher(namespace, self.settings) - fetch.run() - - logfile = fetch.logfile - assert logfile.is_file() - assert 0 == os.path.getsize(logfile) - - def test_copy_attempt_throws_sys_exit_service_error(self): - product_locator = self.test_data['13B-014']['product_locator'] - - # use site from non-local profile to guarantee copy attempt - local_exec_site = self.settings['execution_site'] - self.settings['execution_site'] = ExecutionSite.DSOC - - args = ['--product-locator', product_locator, - '--output-dir', self.top_level, - '--profile', self.settings['execution_site'].value, - '--verbose'] - parser = get_arg_parser() - namespace = parser.parse_args(args) - fetch = DataFetcher(namespace, self.settings) - servers_report = fetch.servers_report - for server in servers_report: - entry = servers_report[server] - assert entry['retrieve_method'].value == RetrievalMode.COPY.value - - # let's try just one file so we're not sitting here all day - for server in servers_report: - entry = servers_report[server] - servers_report = {server: entry} + except SystemExit as exc: + print(f'{exc}') + if '--location-file' in args and '--product-locator' in args: + # you can have one but not both + raise exc + if '--profile' not in args \ + or '--profile' not in capo_settings.keys(): + return_code = ReturnCode.MISSING_PROFILE.value['code'] + raise SystemExit(return_code) + raise exc + + return DataFetcherRefactor(namespace, capo_settings).run() + +def test_settings_setup(settings): + ''' Ensure that the test settings we're using make sense ''' + assert settings.capo_settings is not None + assert isinstance(settings.capo_settings, dict) + assert settings.db_settings is not None + assert isinstance(settings.db_settings, dict) + assert settings.test_data is not None + assert isinstance(settings.test_data, dict) + +def _initialize_test_data(db_settings): + ''' Set up test data for use in several tests ''' + ext_name = '13B-014.sb28862036.eb29155786.56782.5720116088' + + product_locator = ProductLocatorLookup(db_settings) \ + .look_up_locator_for_ext_name(ext_name) + return {'external_name': ext_name, + 'product_locator': product_locator} + +@pytest.mark.skipif(not RUN_ALL, reason='debug') +def test_usage_statement_makes_sense(): + ''' Ensure that the datafetcher's "usage" statement is as we expect + + ''' + + usage = DataFetcherRefactor._build_usage_message() + assert usage.startswith('Usage:') + lines = usage.split('\n') + assert len(lines) >= len(ReturnCode) + 1 + +@pytest.mark.skipif(not RUN_ALL, reason='debug') +def test_no_args_prints_usage_and_croaks(): + ''' If the datafetcher is called without any arguments, + it should print the "usage" statement, then exit + + ''' + + with pytest.raises(TypeError): + datafetcher = DataFetcherRefactor() + assert datafetcher.usage == DataFetcherRefactor._build_usage_message() + +@pytest.mark.skipif(not RUN_ALL, reason='debug') +def test_omitted_capo_args_returns_expected_code(make_tempdir, settings): + ''' Be sure DF dies with appropriate error message if called without Capo profile ''' + + assert not hasattr(os.environ, 'CAPO_PROFILE') + args = ['--product-locator', 'not-even', + '--output-dir', str(make_tempdir)] + with pytest.raises(SystemExit) as exc: + launch_datafetcher(args, settings.capo_settings) + assert exc.value.code['code'] == 1 + +@pytest.mark.skipif(not RUN_ALL, reason='debug') +def test_omitted_capo_value_returns_expected_code(make_tempdir, settings): + ''' Be sure DF dies with appropriate error message if Capo profile + argument but no profile value + + ''' + + args = ['--product-locator', "we're not going to get this far", + '--output-dir', str(make_tempdir), + '--profile'] + expected = ReturnCode.MISSING_PROFILE.value['code'] + with pytest.raises(SystemExit) as exc: + launch_datafetcher(args, settings.capo_settings) + assert exc.value.code == expected + +@pytest.mark.skipif(not RUN_ALL, reason='debug') +def test_invalid_capo_profile_returns_expected_code(make_tempdir, settings): + ''' Be sure DF dies with appropriate error message if called without Capo profile ''' + + args = ['--profile', 'whatevs', + '--product-locator', "we'll never get this far", + '--output-dir', str(make_tempdir), + ] + expected = ReturnCode.MISSING_PROFILE.value['code'] + with pytest.raises(SystemExit) as exc: + launch_datafetcher(args, settings.capo_settings) + assert exc.value.code['code'] == expected + +@pytest.mark.skipif(not RUN_ALL, reason='debug') +def test_inaccessible_output_dir_returns_expected_code(settings, make_tempdir): + umask = os.umask(0o000) + tmpdir = Path(make_tempdir) + tmpdir.chmod(0o666) + args = ['--product-locator', 'scooby-doo', + '--profile', TEST_PROFILE, + '--output-dir', str(make_tempdir)] + original_settings = settings.capo_settings + try: + with pytest.raises(SystemExit) as exc: + settings.capo_settings.pop('download_dir') + launch_datafetcher(args, settings.capo_settings) + assert exc.value.code == ReturnCode.MISSING_SETTING.value + finally: + tmpdir.chmod(umask) + settings.capo_settings = original_settings + # this test results in a log file being written to the + # current directory. nuke it. + curdir = Path('.') + log_files = [file for file in curdir.iterdir() + if str(file).endswith('.log')] + for file in log_files: + file.unlink() + +@pytest.mark.skipif(not RUN_ALL, reason='debug') +def test_two_locator_args_returns_expected_code(make_tempdir, settings): + ''' We should reject invocation with both product locator -and- location + file. One or the other, people! + + ''' + + args = ['--product-locator', 'a_locator', + '--location-file', 'location.json' + '--output-dir', str(make_tempdir), + '--profile', TEST_PROFILE] + with pytest.raises(SystemExit) as exc: + launch_datafetcher(args, settings.capo_settings) + assert exc.value.code == ReturnCode.MISSING_SETTING.value['code'] + +class MockServiceTimeoutReturn: + ''' Simulates locator request service timeout ''' + + @staticmethod + def run(): + return ReturnCode.LOCATOR_SERVICE_TIMEOUT + +@pytest.mark.skipif(not RUN_ALL, reason='debug') +def test_locator_service_timeout_returns_expected_code(monkeypatch, + settings, + make_tempdir): + + def mock_run(*args, **kwargs): + return MockServiceTimeoutReturn.run() + + args = ['--product-locator', settings.test_data['product_locator'], + '--output-dir', str(make_tempdir), + '--profile', TEST_PROFILE] + monkeypatch.setattr(DataFetcherRefactor, "run", mock_run) + return_code = launch_datafetcher(args, settings.capo_settings) + assert return_code == ReturnCode.LOCATOR_SERVICE_TIMEOUT + + +class MockTooManyServiceRedirectsReturn: + + @staticmethod + def run(): + return ReturnCode.TOO_MANY_SERVICE_REDIRECTS + +@pytest.mark.skipif(not RUN_ALL, reason='debug') +def test_too_many_service_redirects_returns_expected_code(monkeypatch, + settings, + make_tempdir): + + def mock_run(*args, **kwargs): + return MockTooManyServiceRedirectsReturn().run() + + args = ['--product-locator', settings.test_data['product_locator'], + '--output-dir', str(make_tempdir), + '--profile', TEST_PROFILE] + monkeypatch.setattr(DataFetcherRefactor, "run", mock_run) + return_code = launch_datafetcher(args, settings.capo_settings) + assert return_code == ReturnCode.TOO_MANY_SERVICE_REDIRECTS + + +class MockCatastrophicServiceErrorReturn: + + @staticmethod + def run(): + return ReturnCode.CATASTROPHIC_REQUEST_ERROR + +@pytest.mark.skipif(not RUN_ALL, reason='debug') +def test_catastrophic_service_error_returns_expected_code(monkeypatch, + settings, + make_tempdir): + + def mock_run(*args, **kwargs): + return MockCatastrophicServiceErrorReturn().run() + + args = ['--product-locator', settings.test_data['product_locator'], + '--output-dir', str(make_tempdir), + '--profile', TEST_PROFILE] + + monkeypatch.setattr(DataFetcherRefactor, "run", mock_run) + return_code = launch_datafetcher(args, settings.capo_settings) + assert return_code == ReturnCode.CATASTROPHIC_REQUEST_ERROR + +@pytest.mark.skipif(not RUN_ALL, reason='debug') +def test_product_locator_not_found_returns_expected_code(make_tempdir, + settings): + args = ['--product-locator', 'not_a_locator', + '--output-dir', str(make_tempdir), + '--profile', TEST_PROFILE] + with pytest.raises(SystemExit) as exc: + launch_datafetcher(args, settings.capo_settings) + assert exc.value.code == ReturnCode.PRODUCT_LOCATOR_NOT_FOUND.value + +@pytest.mark.skipif(not RUN_ALL, reason='debug') +def test_unable_to_open_location_file_returns_expected_code(make_tempdir, settings): + args = ['--location-file', 'location.json', + '--output-dir', str(make_tempdir), + '--profile', TEST_PROFILE] + with pytest.raises(SystemExit) as exc: + launch_datafetcher(args, settings.capo_settings) + assert exc.value.code == ReturnCode.CANNOT_OPEN_LOCATION_FILE.value + + +class MockNgasFetchError: + ''' Simulates a problem fetching file(s) from NGAS server ''' + @staticmethod + def run(): + return ReturnCode.NGAS_FETCH_ERROR + +@pytest.mark.skipif(not RUN_ALL, reason='debug') +def test_error_fetching_file_from_ngas_returns_expected_code(monkeypatch, + settings, + make_tempdir): + + def mock_run(*args, **kwargs): + return MockNgasFetchError().run() + + args = ['--product-locator', settings.test_data['product_locator'], + '--output-dir', str(make_tempdir), + '--profile', TEST_PROFILE] + monkeypatch.setattr(DataFetcherRefactor, "run", mock_run) + return_code = launch_datafetcher(args, settings.capo_settings) + assert return_code == ReturnCode.NGAS_FETCH_ERROR + + +class MockSizeMismatchError: + ''' Simulates a retrieved file of unexpected size ''' + @staticmethod + def run(): + return ReturnCode.SIZE_MISMATCH + +@pytest.mark.skipif(not RUN_ALL, reason='debug') +def test_unexpected_size_returns_expected_code(monkeypatch, + settings, + make_tempdir): + + def mock_run(*args, **kwargs): + return MockSizeMismatchError().run() + + args = ['--product-locator', settings.test_data['product_locator'], + '--output-dir', str(make_tempdir), + '--profile', TEST_PROFILE] + monkeypatch.setattr(DataFetcherRefactor, "run", mock_run) + return_code = launch_datafetcher(args, settings.capo_settings) + assert return_code == ReturnCode.SIZE_MISMATCH + +@pytest.mark.skipif(not RUN_ALL, reason='debug') +def test_nothing_retrieved_if_dry_locator(make_tempdir, settings): + ''' Simulates dry run with product locator ''' + + args = ['--product-locator', settings.test_data['product_locator'], + '--output-dir', str(make_tempdir), + '--profile', TEST_PROFILE, + '--dry-run' + ] + return_code = launch_datafetcher(args, settings.capo_settings) + assert return_code == 0 + tempdir_files = Path(make_tempdir).iterdir() + for file in tempdir_files: + if not str(file).endswith('.log') \ + and not str(file).endswith('.json'): + pytest.fail('dry run with product locator -DID- fetch files') + +@pytest.mark.skipif(not RUN_ALL, reason='debug') +def test_nothing_retrieved_if_dry_file(make_tempdir, settings): + ''' dry run with locations file ''' + + output_dir = Path(make_tempdir) + locations_file = get_mini_locations_file(Path(output_dir, _LOCATION_FILENAME)) + args = ['--location-file', str(locations_file), + '--output-dir', str(make_tempdir), + '--profile', TEST_PROFILE, + '--dry-run' + ] + launch_datafetcher(args, settings.capo_settings) + tempdir_files = output_dir.iterdir() + for file in tempdir_files: + if not str(file).endswith('.log') \ + and not str(file).endswith('.json'): + pytest.fail('dry run with locations file -DID- fetch files') + +@pytest.mark.skipif(not RUN_ALL, reason='debug') +def test_force_flag_overwrites_existing_file(make_tempdir, settings): + top_level = Path(make_tempdir) + location_file = get_mini_locations_file(top_level / _LOCATION_FILENAME) + dest_dir = Path(top_level, _EB_EXTERNAL_NAME) + dest_dir.mkdir(parents=True, exist_ok=True) + + # make a fake file to be overwritten + fake_file = dest_dir / _ASDM_XML + with open(fake_file, 'w') as to_write: + to_write.write('alas, my days are numbered') + + args = ['--location-file', str(location_file), + '--profile', TEST_PROFILE, + '--output-dir', str(top_level), + '--force'] + try: + launch_datafetcher(args, settings.capo_settings) + except SystemExit as ex: + pytest.fail(f'{ex}') + except Exception as exc: + pytest.fail(f'{exc}') + + sizes = dict() + + # go thru destination directory recursively + # and get everybody's size + for file in dest_dir.rglob('*'): + sizes[str(file)] = file.stat().st_size + assert len(sizes) == 37 + fake_size = fake_file.stat().st_size + assert fake_size == 9339 + +@pytest.mark.skipif(not RUN_ALL, reason='debug') +def test_no_overwrite_without_force(make_tempdir, settings): + top_level = Path(make_tempdir) + location_file = get_mini_locations_file(top_level / _LOCATION_FILENAME) + dest_dir = Path(top_level, _EB_EXTERNAL_NAME) + dest_dir.mkdir(parents=True, exist_ok=True) + + # make a fake file to be overwritten + fake_file = dest_dir / _ASDM_XML + with open(fake_file, 'w') as to_write: + to_write.write('alas, my days are numbered') + + args = ['--location-file', str(location_file), + '--profile', TEST_PROFILE, + '--output-dir', str(top_level) + ] + + with pytest.raises(SystemExit) as exc: + launch_datafetcher(args, settings.capo_settings) + assert exc.value.code == ReturnCode.NGAS_FETCH_ERROR.value['code'] + + sizes = dict() + for file in dest_dir.rglob('*'): + sizes[str(file)] = file.stat().st_size + assert len(sizes) < 37 + fake_size = fake_file.stat().st_size + assert fake_size == 26 + + +@pytest.mark.skipif(not RUN_ALL, reason='debug') +def test_more_output_when_verbose(make_tempdir, settings): + top_level = Path(make_tempdir) + location_file = get_mini_locations_file(top_level / _LOCATION_FILENAME) + + args = ['--location-file', str(location_file), + '--profile', TEST_PROFILE, + '--output-dir', str(make_tempdir), + '--verbose'] + + return_code = launch_datafetcher(args, settings.capo_settings) + assert return_code == ReturnCode.SUCCESS.value['code'] + + num_files_expected = 37 + retrieved = [file for file in top_level.rglob('*') + if file.is_file()] + assert num_files_expected == len(retrieved) - 2 + + verbose_logfile = None + for file in retrieved: + if str(file).endswith('.log'): + verbose_logfile = file break - fetch.servers_report = servers_report - assert fetch.servers_report[server] is not None - files = fetch.servers_report[server]['files'] - fetch.servers_report[server]['files'] = [files[0]] - - try: - with pytest.raises(SystemExit) as s_ex: - fetch.run() - assert Errors.NGAS_SERVICE_ERROR.value == s_ex.value.code - finally: - self.settings['execution_site'] = local_exec_site - - def test_dies_with_bad_server_info(self): - report_file = get_locations_file('VLA_BAD_SERVER') - args = ['--location-file', str(report_file), - '--output-dir', self.top_level, - '--profile', TEST_PROFILE] - namespace = get_arg_parser().parse_args(args) - fetch = DataFetcher(namespace, self.settings) - with pytest.raises(SystemExit) as s_ex: - fetch.run() - exc_code = s_ex.value.code - expected = Errors.NGAS_SERVICE_ERROR.value - assert expected == exc_code - - def test_throws_sys_exit_file_exists_if_overwrite_not_forced(self): - toplevel = Path(self.top_level) - location_file = get_mini_locations_file( - Path(self.top_level, _LOCATION_FILENAME)) - assert Path.exists(location_file) - destination = Path(toplevel, _EB_EXTERNAL_NAME) - Path(destination).mkdir(parents=True, exist_ok=True) - assert destination.is_dir() - - # stick a fake SDM in there so it will fall over - fake_file = Path(destination, _ASDM_XML) - with open(fake_file, 'w') as to_write: - to_write.write('lalalalalala') - assert fake_file.exists() - assert os.path.getsize(fake_file) != 0 - - args = ['--location-file', str(location_file), - '--output-dir', self.top_level, - '--profile', TEST_PROFILE] - namespace = get_arg_parser().parse_args(args) - # exception should be thrown because one of the files to be retrieved - # is in the destination dir and we're not forcing overwrite here - with pytest.raises(SystemExit) as exc: - DataFetcher(namespace, self.settings).run() - exc_code = exc.value.code - expected = Errors.FILE_EXISTS_ERROR.value - assert expected == exc_code - - def test_overwrites_when_forced(self): - external_name = LOCATION_REPORTS[_VLA_SMALL_KEY]['external_name'] - toplevel = Path(self.top_level) - destination = toplevel / external_name - destination.mkdir(parents=True, exist_ok=True) - assert destination.is_dir() - - # stick a fake SDM in there to see if overwrite really happens - to_overwrite = _ASDM_XML - fake_file = destination / to_overwrite - text = '"Bother!" said Pooh. "Lock phasers on that heffalump!"' - with open(fake_file, 'w') as to_write: - to_write.write(text) - assert fake_file.exists() - assert len(text) == os.path.getsize(fake_file) - report_metadata = LOCATION_REPORTS['VLA_SMALL_EB'] - external_name = report_metadata['external_name'] - destination = toplevel / external_name - Path(destination).mkdir(parents=True, exist_ok=True) - - json_path = destination / report_metadata['filename'] - report_file = get_mini_locations_file(json_path) - args = ['--location-file', str(report_file), - '--output-dir', self.top_level, - '--profile', TEST_PROFILE, '--force'] - namespace = get_arg_parser().parse_args(args) - report = LocationsReport(self._LOG, namespace, self.settings) - - # expecting 37 files - files = report.files_report['files'] - - sizes = [file['size'] for file in files] - total_size_expected = sum(sizes) - num_files_expected = 37 - assert num_files_expected == len(files) - - fetch = DataFetcher(namespace, self.settings) - retrieved = fetch.run() - assert num_files_expected == len(retrieved) - - # delete the .json so it doesn't mess up our total size computation - Path.unlink(report_file) - - total_size_actual = 0 - dest = Path(destination) - for dirpath, _, filenames in os.walk(dest): - for fname in filenames: - path = Path(dirpath, fname) - total_size_actual += os.path.getsize(path) - assert total_size_expected == total_size_actual - - def test_sys_exit_file_error_on_bad_destination(self): - file_spec = self.test_data['13B-014'] - args = ['--product-locator', file_spec['product_locator'], - '--output-dir', '/foo', - '--profile', TEST_PROFILE] - namespace = get_arg_parser().parse_args(args) - with pytest.raises(SystemExit) as s_ex: - DataFetcher(namespace, self.settings) - assert Errors.FILE_NOT_FOUND_ERROR.value == s_ex.value.code + assert verbose_logfile is not None + verbose_log_size = verbose_logfile.stat().st_size + assert verbose_log_size > 0 - def test_sys_exit_no_locator_for_bad_product_locator(self): - args = ['--product-locator', '/foo', - '--output-dir', self.top_level, '--profile', TEST_PROFILE] - namespace = get_arg_parser().parse_args(args) + # get rid of all the files we downloaded, plus the log + deleted = [file.unlink() for file in retrieved + if not str(file).endswith('.json')] + assert len(deleted) >= num_files_expected - with pytest.raises(SystemExit) as s_ex: - fetch = DataFetcher(namespace, self.settings) - fetch.run() - assert Errors.NO_LOCATOR.value == s_ex.value.code - - def test_gets_expected_test_data(self): - assert self.test_data['13B-014'] is not None - file_spec = self.test_data['13B-014'] - assert '13B-014.sb28862036.eb29155786.56782.5720116088' == file_spec['external_name'] - locator = file_spec['product_locator'] - assert locator.startswith('uid://evla/execblock/') - - def test_gets_vlbas_from_report_file(self): - report_file = get_locations_file('VLBA_EB') - args = ['--location-file', str(report_file), - '--output-dir', self.top_level, '--profile', TEST_PROFILE] - namespace = get_arg_parser().parse_args(args) - fetch = DataFetcher(namespace, self.settings) - report_files = fetch.locations_report.files_report['files'] - - assert 16 == len(report_files) - expected_files = [Path(self.top_level, item['relative_path']) - for item in report_files] - - # files we're getting take waaaaayyy too long to fetch in a test case, - # so we're mocking DataFetcher.run() - fetch.run = MagicMock(return_value=expected_files) - actual_files = fetch.run() - num_expected = len(expected_files) - assert num_expected == len(actual_files) - - match_count = 0 - for exp_file in expected_files: - for act_file in actual_files: - act_parent = act_file.name - if act_parent == exp_file.name: - match_count += 1 - break - assert num_expected == match_count - - def test_gets_large_vla_ebs_from_report_file(self): - report_file = get_locations_file('VLA_LARGE_EB') - args = ['--location-file', str(report_file), - '--output-dir', self.top_level, '--profile', TEST_PROFILE] - namespace = get_arg_parser().parse_args(args) - fetch = DataFetcher(namespace, self.settings) - report_files = fetch.locations_report.files_report['files'] - assert 46 == len(report_files) - toplevel = Path(self.top_level) - expected_files = [toplevel / item['relative_path'] - for item in report_files] - fetch.run = MagicMock(return_value=expected_files) - actual_files = fetch.run() - num_expected = len(expected_files) - assert num_expected == len(actual_files) - - def test_gets_images_from_report_file(self): - report_file = get_locations_file('IMG') - args = ['--location-file', str(report_file), - '--output-dir', self.top_level, '--profile', TEST_PROFILE] - namespace = get_arg_parser().parse_args(args) - fetch = DataFetcher(namespace, self.settings) - report_files = fetch.locations_report.files_report['files'] - assert 2 == len(report_files) - toplevel = Path(self.top_level) - expected_files = [toplevel / item['relative_path'] - for item in report_files] - # files are too big to fetch in a test; mock DataFetcher.run() - fetch.run = MagicMock(return_value=expected_files) - actual_files = fetch.run() - num_expected = len(expected_files) - assert num_expected == len(actual_files) - - def test_gets_calibration_from_report_file(self): - report_file = get_locations_file('CALIBRATION') - args = ['--location-file', str(report_file), - '--output-dir', self.top_level, '--profile', TEST_PROFILE] - namespace = get_arg_parser().parse_args(args) - fetch = DataFetcher(namespace, self.settings) - report_files = fetch.locations_report.files_report['files'] - assert 1 == len(report_files) - file_spec = report_files[0] - - # calibration will have external name = relative path = subdirectory - relative_path = file_spec['relative_path'] - assert relative_path == file_spec['subdirectory'] - - expected_files = [Path(self.top_level, relative_path)] - fetch.run = MagicMock(return_value=expected_files) - actual_files = fetch.run() - num_expected = len(expected_files) - assert num_expected == len(actual_files) - - def test_gets_calibration_from_locator(self): - external_name = LOCATION_REPORTS['CALIBRATION']['external_name'] - product_locator = ProductLocatorLookup( - self.db_settings).look_up_locator_for_ext_name(external_name) - args = ['--product-locator', product_locator, - '--output-dir', self.top_level, '--profile', TEST_PROFILE] - namespace = get_arg_parser().parse_args(args) - fetch = DataFetcher(namespace, self.settings) - report_files = fetch.locations_report.files_report['files'] - assert 1 == len(report_files) - - file_spec = report_files[0] - - # calibration will have external name = relative path = subdirectory - relative_path = file_spec['relative_path'] - assert external_name == relative_path - assert relative_path == file_spec['subdirectory'] - - expected_files = [Path(self.top_level) / relative_path] - fetch.run = MagicMock(return_value=expected_files) - actual_files = fetch.run() - num_expected = len(expected_files) - assert num_expected == len(actual_files) - - def test_retrieval_finds_size_mismatch(self): - report_spec = LOCATION_REPORTS[_VLA_SMALL_KEY] - external_name = report_spec['external_name'] - - data_dir = Path(self.DATA_DIR) - locations_file = data_dir / 'VLA_SMALL_EB_BUSTED.json' - args = ['--location-file', str(locations_file), - '--output-dir', self.top_level, '--profile', TEST_PROFILE] - namespace = get_arg_parser().parse_args(args) - fetch1 = DataFetcher(namespace, self.settings) - report_files = fetch1.locations_report.files_report['files'] - assert 44 == len(report_files) - - filename = 'Weather.xml' - for file in report_files: - if filename == file['relative_path']: - assert 165100 == file['size'] - break - - product_locator = ProductLocatorLookup(self.db_settings) \ - .look_up_locator_for_ext_name(external_name) - args = ['--product-locator', product_locator, - '--output-dir', self.top_level, '--profile', TEST_PROFILE] - namespace = get_arg_parser().parse_args(args) - fetch2 = DataFetcher(namespace, self.settings) - - locations_report = get_locations_report(_VLA_SMALL_KEY) - fetch2.run = MagicMock(return_value=locations_report['files']) - locator_files = fetch2.run() - assert len(report_files) == len(locator_files) - for file1 in report_files: - for file2 in locator_files: - if file2['relative_path'] == file1['relative_path']: - if filename != file1['relative_path']: - assert file2['size'] == file1['size'] - else: - assert file2['size'] != file1['size'] - break - - def test_throws_sys_exit_missing_setting_if_no_args(self): - args = [] - with pytest.raises(SystemExit) as s_ex: - get_arg_parser().parse_args(args) - assert Errors.MISSING_SETTING.value == s_ex.value.code - - def test_throws_sys_exit_no_locator_if_no_product_locator(self): - args = ['--product-locator', '', - '--output-dir', self.top_level, '--profile', TEST_PROFILE] - namespace = get_arg_parser().parse_args(args) + ### same download, but without verbose logging + args = ['--location-file', str(location_file), + '--profile', TEST_PROFILE, + '--output-dir', str(make_tempdir)] + + return_code = launch_datafetcher(args, settings.capo_settings) + assert return_code == ReturnCode.SUCCESS.value['code'] - with pytest.raises(SystemExit) as s_ex: - DataFetcher(namespace, self.settings) - assert Errors.NO_LOCATOR.value == s_ex.value.code + retrieved = [file for file in top_level.rglob('*')] + assert len(retrieved) == num_files_expected + 3 - # -------------------------------------------------------------------------- - # - # U T I L I T I E S - # - # -------------------------------------------------------------------------- + logfile = None + for file in retrieved: + if str(file).endswith('.log'): + logfile = file + break + assert logfile is not None + + logsize = logfile.stat().st_size + # successful download, non-verbose logging, + # should result in zero-size log file + assert logsize == 0 + +@pytest.mark.skipif(not RUN_ALL, reason='debug') +def test_copy_attempt_throws_sys_exit_service_error(make_tempdir, settings): + local_exec_site = settings.capo_settings['execution_site'] + + # use site from non-local profile to guarantee copy attempt + settings.capo_settings['execution_site'] = ExecutionSite.DSOC + + args = ['--product-locator', settings.test_data['product_locator'], + '--output-dir', str(make_tempdir), + '--profile', 'nmprod'] + namespace = get_arg_parser().parse_args(args) + fetcher = DataFetcherRefactor(namespace, settings.capo_settings) + + servers_report = fetcher.servers_report + for server in servers_report: + entry = servers_report[server] + assert entry['retrieve_method'].value == RetrievalMode.COPY.value + + # let's try just one file so we're not sitting here all day + for server in servers_report: + entry = servers_report[server] + servers_report = {server: entry} + fetcher.servers_report = servers_report + assert fetcher.servers_report[server] is not None + break + files = fetcher.servers_report[server]['files'] + fetcher.servers_report[server]['files'] = [files[0]] + + try: + with pytest.raises(SystemExit) as exc: + fetcher.run() + assert exc.value.code == \ + ReturnCode.CATASTROPHIC_REQUEST_ERROR.value['code'] + finally: + settings.capo_settings['execution_site'] = local_exec_site + +@pytest.mark.skipif(not RUN_ALL, reason='debug') +def test_dies_with_bad_server_info(make_tempdir, settings): + location_file = get_locations_file('VLA_BAD_SERVER') + args = ['--profile', TEST_PROFILE, '--output-dir', str(make_tempdir), + '--location-file', str(location_file)] + with pytest.raises(SystemExit) as exc: + launch_datafetcher(args, settings.capo_settings) + error = ReturnCode.NGAS_FETCH_ERROR + expected = error.value['code'] + assert exc.value.code == expected + +@pytest.mark.skipif(not RUN_ALL, reason='debug') +def test_missing_setting_error_on_bad_destination(settings): + args = ['--profile', TEST_PROFILE, + '--product-locator', settings.test_data['product_locator'], + '--output-dir', 'floob'] + with pytest.raises(SystemExit) as exc: + launch_datafetcher(args, settings.capo_settings) + assert exc.value.code == ReturnCode.MISSING_SETTING.value + + +def write_fake_file(destination: Path, file_info: dict): + filename = file_info['ngas_file_id'] + path = Path(destination, filename) + with open(path, 'w') as file: + file.write(f'{str(file_info["size"])}\n') + + +class MockSuccessfulFetchReturn: @staticmethod - def _remove_large_files_from_location_report(locations_in: LocationsReport): - ''' strip files > 100000 bytes from location report, so we can try - an actual stream without it taking forever - - :returns: LocationsReport - ''' - - files = locations_in['files'] - locations_out = locations_in.copy() - locations_out['files'] = \ - [file for file in files if file['size'] <= 100000] - return locations_out - - def _initialize_test_data(self): - ext_name = '13B-014.sb28862036.eb29155786.56782.5720116088' - - product_locator = ProductLocatorLookup(self.db_settings) \ - .look_up_locator_for_ext_name(ext_name) - dict13b = {'external_name': ext_name, - 'product_locator': product_locator} - - to_return = {'13B-014': dict13b} - return to_return - - -class CommandLineFetchLauncher: - """ Launches DataFetcher from command line, with logging - """ - - def __init__(self, args: List, logger: FlexLogger): - args_to_parse = args if args[0] != _FETCH_COMMAND else args[1:] - self._LOG = logger - namespace = get_arg_parser().parse_args(args_to_parse) - self.args = args - self.output_dir = Path(namespace.output_dir) - - if not Path.is_dir(self.output_dir): - raise FileNotFoundError(f'{self.output_dir} not found') - elif not os.access(self.output_dir, os.R_OK): - raise PermissionError(f'{self.output_dir} not found') - self.verbose = namespace.verbose - - def run(self): - ''' launch fetch from command line - @:returns directory listing - ''' - - with subprocess.Popen(self.args, - stdout=subprocess.PIPE, - stderr=subprocess.STDOUT, - bufsize=1, - universal_newlines=True) as proc: - return self.analyze_output(proc) - - def analyze_output(self, proc): - if proc.stderr: - for err in proc.stderr: - self._LOG.error(err.strip()) - - output = proc.stdout - error_found = output.errors - if error_found: - if isinstance(error_found, list): - [self._LOG.error(line) for line in error_found] - else: - if error_found != 'strict': - self._LOG.error(error_found) - - lines = list() - for line in output: - lines.append(line.strip()) - - for i in range(0, len(lines) - 1): - line = lines[i] - self._LOG.debug(f'{line}') - if 'error' in line.lower(): - # log everything after the error - for j in range(i, len(lines) - 1): - self._LOG.error(lines[j]) - if 'debug' in line.lower() and self.verbose: - self._LOG.debug(line) - if 'warn' in line.lower(): - self._LOG.warning(line) - - files_retrieved = list() - for root, dirnames, filenames in os.walk(self.output_dir): - root_dir = Path(root) - if dirnames: - subdir = root_dir / dirnames[0] - else: - subdir = root_dir - for filename in filenames: - if not filename.endswith('.log') and not filename.endswith('.json'): - files_retrieved.append(subdir / filename) - self._LOG.debug(f'>>> {len(files_retrieved)} files retrieved') - return files_retrieved + def run(): + return 0 + +@pytest.fixture +def mock_successful_fetch_run(monkeypatch): + + def mock_run(*args, **kwargs): + return MockSuccessfulFetchReturn().run() + + monkeypatch.setattr(DataFetcherRefactor, "run", mock_run) + +@pytest.mark.skipif(not RUN_ALL, reason='debug') +def test_gets_vlbas_from_report_file(mock_successful_fetch_run, + make_tempdir, + settings): + + location_file = get_locations_file('VLBA_EB') + args = ['--profile', TEST_PROFILE, '--output-dir', str(make_tempdir), + '--location-file', str(location_file)] + fetcher = DataFetcherRefactor(get_arg_parser().parse_args(args), + settings.capo_settings) + servers_report = fetcher.servers_report + assert len(servers_report) == 1 + + return_code = fetcher.run() + assert return_code == 0 + + dest_dir = Path(make_tempdir) + file_info_dict = dict() + for server in servers_report.items(): + assert server[0] == 'nmngas03.aoc.nrao.edu:7777' + values = server[1] + assert values['location'] == Location.DSOC.value + assert values['cluster'] == Cluster.DSOC.value + assert values['retrieve_method'] == RetrievalMode.STREAM + file_values = values['files'] + assert len(file_values) == 16 + for filename in file_values: + write_fake_file(dest_dir, filename) + file_info_dict[filename['ngas_file_id']] = filename + + datafetcher = DataFetcherRefactor(get_arg_parser().parse_args(args), + capo_settings) + return_code = datafetcher.run() + assert return_code == 0 + + for filename in file_info_dict: + path = Path(dest_dir, filename) + assert path.is_file() + contents = path.read_text().strip() + assert int(contents) == file_info_dict[filename]['size'] + +@pytest.mark.skipif(not RUN_ALL, reason='debug') +def test_gets_large_vla_ebs_from_report_file(mock_successful_fetch_run, + make_tempdir, + settings): + location_file = get_locations_file('VLA_SMALL_EB') + args = ['--profile', TEST_PROFILE, '--output-dir', str(make_tempdir), + '--location-file', str(location_file)] + fetcher = DataFetcherRefactor(get_arg_parser().parse_args(args), + settings.capo_settings) + servers_report = fetcher.servers_report + assert len(servers_report) == 2 + + return_code = fetcher.run() + assert return_code == 0 + + server_file_count = {'nmngas03.aoc.nrao.edu:7777': 0, + 'nmngas04.aoc.nrao.edu:7777': 0} + dest_dir = Path(make_tempdir) + file_list = list() + for server in servers_report.items(): + server_url = server[0] + assert server_url in server_file_count.keys() + values = server[1] + assert values['location'] == Location.DSOC.value + assert values['cluster'] == Cluster.DSOC.value + assert values['retrieve_method'] == RetrievalMode.STREAM + file_values = values['files'] + server_file_count[server_url] += len(file_values) + + for filename in file_values: + write_fake_file(dest_dir, filename) + file_list.append(values) + + assert server_file_count['nmngas03.aoc.nrao.edu:7777'] == 3 + assert server_file_count['nmngas04.aoc.nrao.edu:7777'] == 41 + + datafetcher = DataFetcherRefactor(get_arg_parser().parse_args(args), + capo_settings) + return_code = datafetcher.run() + assert return_code == 0 + + found_count = 0 + for file_info in file_list: + for file in file_info['files']: + filename = file['ngas_file_id'] + path = Path(dest_dir, filename) + assert path.is_file() + contents = path.read_text().strip() + assert int(contents) == file['size'] + found_count += 1 + + assert found_count == len(file_list) + +@pytest.mark.skipif(not RUN_ALL, reason='debug') +def test_gets_images_from_report_file(mock_successful_fetch_run, + make_tempdir, + settings): + location_file = get_locations_file('IMG') + args = ['--profile', TEST_PROFILE, '--output-dir', str(make_tempdir), + '--location-file', str(location_file)] + fetcher = DataFetcherRefactor(get_arg_parser().parse_args(args), + settings.capo_settings) + servers_report = fetcher.servers_report + assert len(servers_report) == 2 + + server_file_count = {'nmngas01.aoc.nrao.edu:7777': 0, + 'nmngas02.aoc.nrao.edu:7777': 0} + dest_dir = Path(make_tempdir) + file_list = list() + for server in servers_report.items(): + server_url = server[0] + assert server_url in server_file_count.keys() + values = server[1] + assert values['location'] == Location.DSOC.value + assert values['cluster'] == Cluster.DSOC.value + assert values['retrieve_method'] == RetrievalMode.STREAM + file_values = values['files'] + server_file_count[server_url] += len(file_values) + + for filename in file_values: + write_fake_file(dest_dir, filename) + file_list.append(values) + + for server_url, count in server_file_count.items(): + assert count == 1 + + return_code = fetcher.run() + assert return_code == 0 + + found_count = 0 + for file_info in file_list: + for file in file_info['files']: + filename = file['ngas_file_id'] + path = Path(dest_dir, filename) + assert path.is_file() + contents = path.read_text().strip() + assert int(contents) == file['size'] + found_count += 1 + + assert found_count == len(file_list) + +@pytest.mark.skipif(not RUN_ALL, reason='debug') +def test_gets_calibration_from_report_file(mock_successful_fetch_run, + make_tempdir, + settings): + location_file = get_locations_file('CALIBRATION') + args = ['--profile', TEST_PROFILE, '--output-dir', str(make_tempdir), + '--location-file', str(location_file)] + fetcher = DataFetcherRefactor(get_arg_parser().parse_args(args), + settings.capo_settings) + servers_report = fetcher.servers_report + assert len(servers_report) == 1 + + fake_file = None + file_info = None + # (there will be just one file, therefore iteration) + for server in servers_report.items(): + metadata = server[1] + destination = Path(make_tempdir) + file_info = metadata['files'][0] + fake_file = Path(destination, file_info['ngas_file_id']) + write_fake_file(destination, file_info) + + assert fake_file.is_file() + contents = fake_file.read_text().strip() + assert int(contents) == file_info['size'] + +@pytest.mark.skipif(not RUN_ALL, reason='debug') +def test_gets_calibration_from_locator(mock_successful_fetch_run, + make_tempdir, + settings): + external_name = LOCATION_REPORTS['CALIBRATION']['external_name'] + product_locator = ProductLocatorLookup( + settings.db_settings).look_up_locator_for_ext_name(external_name) + args = ['--product-locator', product_locator, + '--output-dir', str(make_tempdir), '--profile', TEST_PROFILE] + namespace = get_arg_parser().parse_args(args) + fetch = DataFetcherRefactor(namespace, settings.capo_settings) + report_files = fetch.locations_report.files_report['files'] + assert len(report_files) == 1 + + file_spec = report_files[0] + relative_path = file_spec['relative_path'] + assert external_name == relative_path + assert relative_path == file_spec['subdirectory'] + destination = Path(make_tempdir)/relative_path + destination.mkdir() + write_fake_file(destination, file_spec) + + fake_file = Path(destination, file_spec['ngas_file_id']) + assert fake_file.is_file() + contents = fake_file.read_text().strip() + assert int(contents) == file_spec['size'] + + +class Settings: + + def __init__(self, capo_settings, db_settings, test_data): + self.capo_settings = capo_settings + self.db_settings = db_settings + self.test_data = test_data