""" datafetcher unit tests """ import os import subprocess import tempfile from pathlib import Path from typing import List from unittest.mock import MagicMock import pytest from datafetcher.commands import DataFetcher from datafetcher.errors import Errors from datafetcher.locations_report import LocationsReport from datafetcher.utilities import get_arg_parser, ExecutionSite, \ RetrievalMode, FlexLogger, ProductLocatorLookup from .testing_utils import TEST_PROFILE, LOCATION_REPORTS, \ get_locations_report, get_locations_file, \ get_mini_locations_file, find_newest_fetch_log_file, get_test_data_dir, \ get_metadata_db_settings, get_test_capo_settings _VLA_SMALL_KEY = 'VLA_SMALL_EB' _FETCH_COMMAND = 'datafetcher' _LOCATION_FILENAME = 'locations.json' _EB_EXTERNAL_NAME = 'sysstartS.58955.83384832176' _ASDM_XML = 'ASDM.xml' ''' TO EXECUTE THESE TESTS: from apps/cli/executables/datafetcher, pytest -vx --logging-level=INFO ''' class TestDataFetcher: """ IMPORTANT NOTE: we CANNOT retrieve by copy if we don't have access to a location to which NGAS can write, e.g, lustre. Therefore, any test that involves -actual- retrieval of files must be by streaming, to ensure which we must use a Capo profile in which the execution site is -not- DSOC or NAASC. The reason is this algorithm used in LocationsReport: for f in files_report['files']: if f['server']['cluster'] == Cluster.DSOC and \ f['server']['location'] == self.settings['execution_site']: f['server']['retrieve_method'] = RetrievalMode.COPY else: f['server']['retrieve_method'] = RetrievalMode.STREAM Be sure to have on the test system a local profile (local.properties) that meets these criteria: - edu.nrao.archive.workflow.config.StartupSettings.temporaryDataDirectory pointing to a locally writable temp dir, e.g., /var/tmp - edu.nrao.archive.workflow.config.DeliverySettings.hostname must point to local computer - execution_site must NOT be DSOC or NAASC """ @pytest.fixture(autouse=True, scope='function') def setup_settings_datadir(self) -> None: self.settings = get_test_capo_settings() self.db_settings = get_metadata_db_settings(TEST_PROFILE) self.test_data = self._initialize_test_data() self.DATA_DIR = get_test_data_dir() if self.DATA_DIR is None: pytest.fail(f'test data directory not found under {os.getcwd()}') @pytest.fixture(autouse=True, scope='function') def make_tempdir(self) -> None: umask = os.umask(0o000) self.top_level = tempfile.mkdtemp() os.umask(umask) self._LOG = FlexLogger(__name__, self.top_level) def test_bad_command_line(self): # bad product locator args = [_FETCH_COMMAND, '--product-locator', 'not-even', '--profile', TEST_PROFILE, '--output-dir', self.top_level] fetcher = CommandLineFetchLauncher(args, self._LOG) try: fetcher.run() except FileNotFoundError as err: self._LOG.debug(f'>>> {err}') raise err exception_found = False terminal_exception_thrown = False bad_locator_logfile = find_newest_fetch_log_file(self.top_level) assert bad_locator_logfile is not None assert 0 != os.path.getsize(bad_locator_logfile) with open(bad_locator_logfile) as log: log_contents = log.readlines() for line in log_contents: if 'NoLocatorException' in line: exception_found = True if 'terminal_exception' in line: terminal_exception_thrown = True if exception_found and terminal_exception_thrown: break assert exception_found assert terminal_exception_thrown bad_locator_logfile.unlink() # nonexistent locations file args = [_FETCH_COMMAND, '--location-file', 'aint_got_one', '--output-dir', self.top_level, '--profile', TEST_PROFILE] fetcher = CommandLineFetchLauncher(args, self._LOG) fetcher.run() logfile = find_newest_fetch_log_file(self.top_level) with open(logfile, 'r') as log: log_contents = log.readlines() exception_found = False terminal_exception_thrown = False for line in log_contents: if 'FileNotFoundError' in line: exception_found = True if 'terminal_exception' in line: terminal_exception_thrown = True if exception_found and terminal_exception_thrown: break assert exception_found def test_nothing_retrieved_if_dry_on_cmd_line(self): toplevel = Path(self.top_level) location_file = get_mini_locations_file( Path(toplevel, _LOCATION_FILENAME)) args = [_FETCH_COMMAND, '--location-file', str(location_file), '--profile', TEST_PROFILE, '--output-dir', self.top_level, '--dry', '--verbose'] fetcher = CommandLineFetchLauncher(args, self._LOG) output = fetcher.run() logfile = find_newest_fetch_log_file(self.top_level) assert [] == output assert 0 != os.path.getsize(logfile) Path.unlink(location_file) # make sure none of these files written file_count = 0 for _ in os.walk(location_file): file_count += 1 assert 0 == file_count def test_force_overwrite_from_cmd_line(self): toplevel = Path(self.top_level) location_file = get_mini_locations_file(toplevel / _LOCATION_FILENAME) dest_dir = Path(toplevel, _EB_EXTERNAL_NAME) dest_dir.mkdir(parents=True, exist_ok=True) # make a fake file to be overwritten fake_file = dest_dir / _ASDM_XML with open(fake_file, 'w') as to_write: to_write.write('alas, my days are numbered') args = [_FETCH_COMMAND, '--location-file', str(location_file), '--profile', TEST_PROFILE, '--output-dir', self.top_level, '--force'] CommandLineFetchLauncher(args, self._LOG).run() sizes = dict() for _, _, fnames in os.walk(dest_dir): for fname in fnames: path = dest_dir / fname sizes[path] = os.path.getsize(path) assert 37 == len(sizes) fake_size = os.path.getsize(fake_file) assert 9339 == fake_size def test_no_overwrite_from_cmd_line(self): toplevel = Path(self.top_level) location_file = get_mini_locations_file(toplevel / _LOCATION_FILENAME) dest_dir = toplevel / _EB_EXTERNAL_NAME dest_dir.mkdir(parents=True, exist_ok=True) # make a fake file that shouldn't be overwritten fake_file = dest_dir / _ASDM_XML with open(fake_file, 'w') as to_write: to_write.write("I'm not going anywhere!") args = [_FETCH_COMMAND, '--location-file', str(location_file), '--profile', TEST_PROFILE, '--output-dir', self.top_level] fetcher = CommandLineFetchLauncher(args, self._LOG) fetcher.run() term_except_found = False file_exists_found = False logfile = find_newest_fetch_log_file(self.top_level) with open(logfile, 'r') as log: log_contents = log.readlines() for line in log_contents: if 'terminal_exception' in line: term_except_found = True if 'FileExistsError' in line: file_exists_found = True if term_except_found and file_exists_found: break assert term_except_found and file_exists_found def test_cmd_line_more_output_when_verbose(self): report_file = get_mini_locations_file( Path(self.top_level, 'locations_verbose.json')) args = [_FETCH_COMMAND, '--location-file', str(report_file), '--profile', TEST_PROFILE, '--output-dir', self.top_level, '--verbose'] fetcher = CommandLineFetchLauncher(args, self._LOG) retrieved = fetcher.run() num_files_expected = 37 assert num_files_expected == len(retrieved) verbose_logfile = find_newest_fetch_log_file(self.top_level) assert 0 != os.path.getsize(verbose_logfile) [file.unlink() for file in retrieved] verbose_logfile.unlink() # same thing, but without verbose args = [_FETCH_COMMAND, '--location-file', str(report_file), '--profile', TEST_PROFILE, '--output-dir', self.top_level] fetcher = CommandLineFetchLauncher(args, self._LOG) retrieved = fetcher.run() assert num_files_expected == len(retrieved) logfile = find_newest_fetch_log_file(self.top_level) assert 0 == os.path.getsize(logfile) def test_can_stream_from_mini_locations_file(self): """ gin up a location report with just a few small files in it and confirm that we can actually stream them """ location_file = get_mini_locations_file(Path(self.top_level, _LOCATION_FILENAME)) report_file = get_mini_locations_file(location_file) args = ['--location-file', str(report_file), '--output-dir', self.top_level, '--profile', TEST_PROFILE] namespace = get_arg_parser().parse_args(args) fetch = DataFetcher(namespace, self.settings) retrieved = fetch.run() file_count = len(retrieved) assert 37 == file_count def test_verbose_writes_stuff_to_log(self): path = Path(self.top_level, _LOCATION_FILENAME) report_file = get_mini_locations_file(path) args = ['--location-file', str(report_file), '--output-dir', self.top_level, '--profile', TEST_PROFILE, '--verbose'] namespace = get_arg_parser().parse_args(args) fetch = DataFetcher(namespace, self.settings) fetch.run() logfile = fetch.logfile assert logfile.is_file() assert 0 != os.path.getsize(logfile) def test_empty_log_if_not_verbose(self): path = Path(self.top_level, _LOCATION_FILENAME) report_file = get_mini_locations_file(path) args = ['--location-file', str(report_file), '--output-dir', self.top_level, '--profile', TEST_PROFILE] namespace = get_arg_parser().parse_args(args) fetch = DataFetcher(namespace, self.settings) fetch.run() logfile = fetch.logfile assert logfile.is_file() assert 0 == os.path.getsize(logfile) def test_copy_attempt_throws_sys_exit_service_error(self): product_locator = self.test_data['13B-014']['product_locator'] # use site from non-local profile to guarantee copy attempt local_exec_site = self.settings['execution_site'] self.settings['execution_site'] = ExecutionSite.DSOC args = ['--product-locator', product_locator, '--output-dir', self.top_level, '--profile', self.settings['execution_site'].value, '--verbose'] parser = get_arg_parser() namespace = parser.parse_args(args) fetch = DataFetcher(namespace, self.settings) servers_report = fetch.servers_report for server in servers_report: entry = servers_report[server] assert entry['retrieve_method'].value == RetrievalMode.COPY.value # let's try just one file so we're not sitting here all day for server in servers_report: entry = servers_report[server] servers_report = {server: entry} break fetch.servers_report = servers_report assert fetch.servers_report[server] is not None files = fetch.servers_report[server]['files'] fetch.servers_report[server]['files'] = [files[0]] try: with pytest.raises(SystemExit) as s_ex: fetch.run() assert Errors.NGAS_SERVICE_ERROR.value == s_ex.value.code finally: self.settings['execution_site'] = local_exec_site def test_dies_with_bad_server_info(self): report_file = get_locations_file('VLA_BAD_SERVER') args = ['--location-file', str(report_file), '--output-dir', self.top_level, '--profile', TEST_PROFILE] namespace = get_arg_parser().parse_args(args) fetch = DataFetcher(namespace, self.settings) with pytest.raises(SystemExit) as s_ex: fetch.run() exc_code = s_ex.value.code expected = Errors.NGAS_SERVICE_ERROR.value assert expected == exc_code def test_throws_sys_exit_file_exists_if_overwrite_not_forced(self): toplevel = Path(self.top_level) location_file = get_mini_locations_file( Path(self.top_level, _LOCATION_FILENAME)) assert Path.exists(location_file) destination = Path(toplevel, _EB_EXTERNAL_NAME) Path(destination).mkdir(parents=True, exist_ok=True) assert destination.is_dir() # stick a fake SDM in there so it will fall over fake_file = Path(destination, _ASDM_XML) with open(fake_file, 'w') as to_write: to_write.write('lalalalalala') assert fake_file.exists() assert os.path.getsize(fake_file) != 0 args = ['--location-file', str(location_file), '--output-dir', self.top_level, '--profile', TEST_PROFILE] namespace = get_arg_parser().parse_args(args) # exception should be thrown because one of the files to be retrieved # is in the destination dir and we're not forcing overwrite here with pytest.raises(SystemExit) as exc: DataFetcher(namespace, self.settings).run() exc_code = exc.value.code expected = Errors.FILE_EXISTS_ERROR.value assert expected == exc_code def test_overwrites_when_forced(self): external_name = LOCATION_REPORTS[_VLA_SMALL_KEY]['external_name'] toplevel = Path(self.top_level) destination = toplevel / external_name destination.mkdir(parents=True, exist_ok=True) assert destination.is_dir() # stick a fake SDM in there to see if overwrite really happens to_overwrite = _ASDM_XML fake_file = destination / to_overwrite text = '"Bother!" said Pooh. "Lock phasers on that heffalump!"' with open(fake_file, 'w') as to_write: to_write.write(text) assert fake_file.exists() assert len(text) == os.path.getsize(fake_file) report_metadata = LOCATION_REPORTS['VLA_SMALL_EB'] external_name = report_metadata['external_name'] destination = toplevel / external_name Path(destination).mkdir(parents=True, exist_ok=True) json_path = destination / report_metadata['filename'] report_file = get_mini_locations_file(json_path) args = ['--location-file', str(report_file), '--output-dir', self.top_level, '--profile', TEST_PROFILE, '--force'] namespace = get_arg_parser().parse_args(args) report = LocationsReport(self._LOG, namespace, self.settings) # expecting 37 files files = report.files_report['files'] sizes = [file['size'] for file in files] total_size_expected = sum(sizes) num_files_expected = 37 assert num_files_expected == len(files) fetch = DataFetcher(namespace, self.settings) retrieved = fetch.run() assert num_files_expected == len(retrieved) # delete the .json so it doesn't mess up our total size computation Path.unlink(report_file) total_size_actual = 0 dest = Path(destination) for dirpath, _, filenames in os.walk(dest): for fname in filenames: path = Path(dirpath, fname) total_size_actual += os.path.getsize(path) assert total_size_expected == total_size_actual def test_sys_exit_file_error_on_bad_destination(self): file_spec = self.test_data['13B-014'] args = ['--product-locator', file_spec['product_locator'], '--output-dir', '/foo', '--profile', TEST_PROFILE] namespace = get_arg_parser().parse_args(args) with pytest.raises(SystemExit) as s_ex: DataFetcher(namespace, self.settings) assert Errors.FILE_NOT_FOUND_ERROR.value == s_ex.value.code def test_sys_exit_no_locator_for_bad_product_locator(self): args = ['--product-locator', '/foo', '--output-dir', self.top_level, '--profile', TEST_PROFILE] namespace = get_arg_parser().parse_args(args) with pytest.raises(SystemExit) as s_ex: fetch = DataFetcher(namespace, self.settings) fetch.run() assert Errors.NO_LOCATOR.value == s_ex.value.code def test_gets_expected_test_data(self): assert self.test_data['13B-014'] is not None file_spec = self.test_data['13B-014'] assert '13B-014.sb28862036.eb29155786.56782.5720116088' == file_spec['external_name'] locator = file_spec['product_locator'] assert locator.startswith('uid://evla/execblock/') def test_gets_vlbas_from_report_file(self): report_file = get_locations_file('VLBA_EB') args = ['--location-file', str(report_file), '--output-dir', self.top_level, '--profile', TEST_PROFILE] namespace = get_arg_parser().parse_args(args) fetch = DataFetcher(namespace, self.settings) report_files = fetch.locations_report.files_report['files'] assert 16 == len(report_files) expected_files = [Path(self.top_level, item['relative_path']) for item in report_files] # files we're getting take waaaaayyy too long to fetch in a test case, # so we're mocking DataFetcher.run() fetch.run = MagicMock(return_value=expected_files) actual_files = fetch.run() num_expected = len(expected_files) assert num_expected == len(actual_files) match_count = 0 for exp_file in expected_files: for act_file in actual_files: act_parent = act_file.name if act_parent == exp_file.name: match_count += 1 break assert num_expected == match_count def test_gets_large_vla_ebs_from_report_file(self): report_file = get_locations_file('VLA_LARGE_EB') args = ['--location-file', str(report_file), '--output-dir', self.top_level, '--profile', TEST_PROFILE] namespace = get_arg_parser().parse_args(args) fetch = DataFetcher(namespace, self.settings) report_files = fetch.locations_report.files_report['files'] assert 46 == len(report_files) toplevel = Path(self.top_level) expected_files = [toplevel / item['relative_path'] for item in report_files] fetch.run = MagicMock(return_value=expected_files) actual_files = fetch.run() num_expected = len(expected_files) assert num_expected == len(actual_files) def test_gets_images_from_report_file(self): report_file = get_locations_file('IMG') args = ['--location-file', str(report_file), '--output-dir', self.top_level, '--profile', TEST_PROFILE] namespace = get_arg_parser().parse_args(args) fetch = DataFetcher(namespace, self.settings) report_files = fetch.locations_report.files_report['files'] assert 2 == len(report_files) toplevel = Path(self.top_level) expected_files = [toplevel / item['relative_path'] for item in report_files] # files are too big to fetch in a test; mock DataFetcher.run() fetch.run = MagicMock(return_value=expected_files) actual_files = fetch.run() num_expected = len(expected_files) assert num_expected == len(actual_files) def test_gets_calibration_from_report_file(self): report_file = get_locations_file('CALIBRATION') args = ['--location-file', str(report_file), '--output-dir', self.top_level, '--profile', TEST_PROFILE] namespace = get_arg_parser().parse_args(args) fetch = DataFetcher(namespace, self.settings) report_files = fetch.locations_report.files_report['files'] assert 1 == len(report_files) file_spec = report_files[0] # calibration will have external name = relative path = subdirectory relative_path = file_spec['relative_path'] assert relative_path == file_spec['subdirectory'] expected_files = [Path(self.top_level, relative_path)] fetch.run = MagicMock(return_value=expected_files) actual_files = fetch.run() num_expected = len(expected_files) assert num_expected == len(actual_files) def test_gets_calibration_from_locator(self): external_name = LOCATION_REPORTS['CALIBRATION']['external_name'] product_locator = ProductLocatorLookup( self.db_settings).look_up_locator_for_ext_name(external_name) args = ['--product-locator', product_locator, '--output-dir', self.top_level, '--profile', TEST_PROFILE] namespace = get_arg_parser().parse_args(args) fetch = DataFetcher(namespace, self.settings) report_files = fetch.locations_report.files_report['files'] assert 1 == len(report_files) file_spec = report_files[0] # calibration will have external name = relative path = subdirectory relative_path = file_spec['relative_path'] assert external_name == relative_path assert relative_path == file_spec['subdirectory'] expected_files = [Path(self.top_level) / relative_path] fetch.run = MagicMock(return_value=expected_files) actual_files = fetch.run() num_expected = len(expected_files) assert num_expected == len(actual_files) def test_retrieval_finds_size_mismatch(self): report_spec = LOCATION_REPORTS[_VLA_SMALL_KEY] external_name = report_spec['external_name'] data_dir = Path(self.DATA_DIR) locations_file = data_dir / 'VLA_SMALL_EB_BUSTED.json' args = ['--location-file', str(locations_file), '--output-dir', self.top_level, '--profile', TEST_PROFILE] namespace = get_arg_parser().parse_args(args) fetch1 = DataFetcher(namespace, self.settings) report_files = fetch1.locations_report.files_report['files'] assert 44 == len(report_files) filename = 'Weather.xml' for file in report_files: if filename == file['relative_path']: assert 165100 == file['size'] break product_locator = ProductLocatorLookup(self.db_settings) \ .look_up_locator_for_ext_name(external_name) args = ['--product-locator', product_locator, '--output-dir', self.top_level, '--profile', TEST_PROFILE] namespace = get_arg_parser().parse_args(args) fetch2 = DataFetcher(namespace, self.settings) locations_report = get_locations_report(_VLA_SMALL_KEY) fetch2.run = MagicMock(return_value=locations_report['files']) locator_files = fetch2.run() assert len(report_files) == len(locator_files) for file1 in report_files: for file2 in locator_files: if file2['relative_path'] == file1['relative_path']: if filename != file1['relative_path']: assert file2['size'] == file1['size'] else: assert file2['size'] != file1['size'] break def test_throws_sys_exit_missing_setting_if_no_args(self): args = [] with pytest.raises(SystemExit) as s_ex: get_arg_parser().parse_args(args) assert Errors.MISSING_SETTING.value == s_ex.value.code def test_throws_sys_exit_no_locator_if_no_product_locator(self): args = ['--product-locator', '', '--output-dir', self.top_level, '--profile', TEST_PROFILE] namespace = get_arg_parser().parse_args(args) with pytest.raises(SystemExit) as s_ex: DataFetcher(namespace, self.settings) assert Errors.NO_LOCATOR.value == s_ex.value.code # -------------------------------------------------------------------------- # # U T I L I T I E S # # -------------------------------------------------------------------------- @staticmethod def _remove_large_files_from_location_report(locations_in: LocationsReport): ''' strip files > 100000 bytes from location report, so we can try an actual stream without it taking forever :returns: LocationsReport ''' files = locations_in['files'] locations_out = locations_in.copy() locations_out['files'] = \ [file for file in files if file['size'] <= 100000] return locations_out def _initialize_test_data(self): ext_name = '13B-014.sb28862036.eb29155786.56782.5720116088' product_locator = ProductLocatorLookup(self.db_settings) \ .look_up_locator_for_ext_name(ext_name) dict13b = {'external_name': ext_name, 'product_locator': product_locator} to_return = {'13B-014': dict13b} return to_return class CommandLineFetchLauncher: """ Launches DataFetcher from command line, with logging """ def __init__(self, args: List, logger: FlexLogger): args_to_parse = args if args[0] != _FETCH_COMMAND else args[1:] self._LOG = logger namespace = get_arg_parser().parse_args(args_to_parse) self.args = args self.output_dir = Path(namespace.output_dir) if not Path.is_dir(self.output_dir): raise FileNotFoundError(f'{self.output_dir} not found') elif not os.access(self.output_dir, os.R_OK): raise PermissionError(f'{self.output_dir} not found') self.verbose = namespace.verbose def run(self): ''' launch fetch from command line @:returns directory listing ''' with subprocess.Popen(self.args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, bufsize=1, universal_newlines=True) as proc: return self.analyze_output(proc) def analyze_output(self, proc): if proc.stderr: for err in proc.stderr: self._LOG.error(err.strip()) output = proc.stdout error_found = output.errors if error_found: if isinstance(error_found, list): [self._LOG.error(line) for line in error_found] else: if error_found != 'strict': self._LOG.error(error_found) lines = list() for line in output: lines.append(line.strip()) for i in range(0, len(lines) - 1): line = lines[i] self._LOG.debug(f'{line}') if 'error' in line.lower(): # log everything after the error for j in range(i, len(lines) - 1): self._LOG.error(lines[j]) if 'debug' in line.lower() and self.verbose: self._LOG.debug(line) if 'warn' in line.lower(): self._LOG.warning(line) files_retrieved = list() for root, dirnames, filenames in os.walk(self.output_dir): root_dir = Path(root) if dirnames: subdir = root_dir / dirnames[0] else: subdir = root_dir for filename in filenames: if not filename.endswith('.log') and not filename.endswith('.json'): files_retrieved.append(subdir / filename) self._LOG.debug(f'>>> {len(files_retrieved)} files retrieved') return files_retrieved