Skip to content
Snippets Groups Projects
test_df_function.py 19 KiB
Newer Older
import sys

sys.path.insert(0, str(Path('.').absolute()))
from .df_pytest_utils import get_project_root
project_root = get_project_root()
sys.path.insert(0, str(project_root))

# pylint: disable=C0115, C0116, C0200, C0415, R0801, R0902, R0903, R0914, R1721, W0212, W0611, W0613, W0621, W0703, W1203

from datafetcher.datafetcher import DataFetcher, ReturnCode
from datafetcher.utilities import get_arg_parser, ProductLocatorLookup, \
# N.B. these are all in use; SonarLint just doesn't get it
from .df_pytest_utils import TEST_PROFILE, get_mini_locations_file, \
    get_locations_file, LOCATION_REPORTS, capo_settings, launch_datafetcher, \
    settings, make_tempdir, RUN_ALL, confirm_retrieve_mode_copy

_LOCATION_FILENAME = 'locations.json'
_ASDM_XML = 'ASDM.xml'
_EB_EXTERNAL_NAME = 'sysstartS.58955.83384832176'

# set this to False when debugging one or more tests
# so as not to have to sit thru every test;
# comment out the target test(s)' @pytest.skip
    """ Ensure that the test settings we're using make sense """
    assert capo_settings is not None
    assert isinstance(settings.capo_settings, dict)
    assert settings.db_settings is not None
    assert isinstance(settings.db_settings, dict)
    assert settings.test_data is not None
    assert isinstance(settings.test_data, dict)


@pytest.mark.skipif(not RUN_ALL, reason='debug')
def test_usage_statement_makes_sense():
    """ Ensure that the datafetcher's "usage" statement is as we expect """
    usage = DataFetcher._build_usage_message()
    assert usage.startswith('Usage:')
    lines = usage.split('\n')
    assert len(lines) >= len(ReturnCode) + 1
@pytest.mark.skipif(not RUN_ALL, reason='debug')
def test_nothing_retrieved_if_dry_locator(make_tempdir, settings):
    """ Simulates dry run with product locator """

    args = ['--product-locator', settings.test_data['product_locator'],
            '--output-dir', str(make_tempdir),
            '--profile', TEST_PROFILE,
            '--dry-run'
            ]
    return_code = launch_datafetcher(args, settings.capo_settings)
    assert return_code == 0
    tempdir_files = Path(make_tempdir).iterdir()
    for file in tempdir_files:
        if not str(file).endswith('.log') \
                and not str(file).endswith('.json'):
            pytest.fail('dry run with product locator -DID- fetch files')

@pytest.mark.skipif(not RUN_ALL, reason='debug')
def test_nothing_retrieved_if_dry_file(make_tempdir, settings):
    output_dir = Path(make_tempdir)
    locations_file = get_mini_locations_file(Path(output_dir, _LOCATION_FILENAME))
    args = ['--location-file', str(locations_file),
            '--output-dir', str(make_tempdir),
            '--profile', TEST_PROFILE,
            '--dry-run'
            ]
    launch_datafetcher(args, settings.capo_settings)
    tempdir_files = output_dir.iterdir()
    for file in tempdir_files:
        if not str(file).endswith('.log') \
                and not str(file).endswith('.json'):
            pytest.fail('dry run with locations file -DID- fetch files')

@pytest.mark.skipif(not RUN_ALL, reason='debug')
def test_force_flag_overwrites_existing_file(make_tempdir, settings):
    top_level = Path(make_tempdir)
    location_file = get_mini_locations_file(top_level / _LOCATION_FILENAME)
    dest_dir = Path(top_level, _EB_EXTERNAL_NAME)
    dest_dir.mkdir(parents=True, exist_ok=True)

    # make a fake file to be overwritten
    fake_file = dest_dir / _ASDM_XML
    with open(fake_file, 'w') as to_write:
        to_write.write('alas, my days are numbered')

    args = ['--location-file', str(location_file),
            '--profile', TEST_PROFILE,
            '--output-dir', str(top_level),
            '--force']
    try:
        launch_datafetcher(args, settings.capo_settings)
    except SystemExit as ex:
        pytest.fail(f'{ex}')
    except Exception as exc:
        pytest.fail(f'{exc}')

    sizes = dict()

    # go thru destination directory recursively
    # and get everybody's size
    for file in dest_dir.rglob('*'):
        sizes[str(file)] = file.stat().st_size
@pytest.mark.skipif(not RUN_ALL, reason='debug')
def test_no_overwrite_without_force(make_tempdir, settings):
    top_level = Path(make_tempdir)
    location_file = get_mini_locations_file(top_level / _LOCATION_FILENAME)
    dest_dir = Path(top_level, _EB_EXTERNAL_NAME)
    dest_dir.mkdir(parents=True, exist_ok=True)

    # make a fake file to be overwritten
    fake_file = dest_dir / _ASDM_XML
    with open(fake_file, 'w') as to_write:
        to_write.write('dang! what a kick in the rubber parts!')

    args = ['--location-file', str(location_file),
            '--profile', TEST_PROFILE,
            '--output-dir', str(top_level)
            ]

    return_code = launch_datafetcher(args, settings.capo_settings)
    assert return_code == ReturnCode.NGAS_FETCH_ERROR.value['code']

    sizes = dict()
    for file in dest_dir.rglob('*'):
        sizes[str(file)] = file.stat().st_size
    assert len(sizes) < 37
    fake_size = fake_file.stat().st_size
@pytest.mark.skipif(not RUN_ALL, reason='debug')
def test_more_output_when_verbose(make_tempdir, settings):
    top_level = Path(make_tempdir)
    location_file = get_mini_locations_file(top_level / _LOCATION_FILENAME)

    args = ['--location-file', str(location_file),
            '--profile', TEST_PROFILE,
            '--output-dir', str(make_tempdir),
            '--verbose']

    return_code = launch_datafetcher(args, settings.capo_settings)
    assert return_code == ReturnCode.SUCCESS.value['code']

    num_files_expected = 37
    retrieved = [file for file in top_level.rglob('*')
                 if file.is_file()]
    assert num_files_expected == len(retrieved) - 2

    verbose_logfile = None
    for file in retrieved:
        if str(file).endswith('.log'):
            verbose_logfile = file
            break

    assert verbose_logfile is not None
    verbose_log_size = verbose_logfile.stat().st_size
    assert verbose_log_size > 0

    # get rid of all the files we downloaded, plus the log
    deleted = [file.unlink() for file in retrieved
               if not str(file).endswith('.json')]
    assert len(deleted) >= num_files_expected

    # same download, but without verbose logging
    args = ['--location-file', str(location_file),
            '--profile', TEST_PROFILE,
            '--output-dir', str(make_tempdir)]

    return_code = launch_datafetcher(args, settings.capo_settings)
    assert return_code == ReturnCode.SUCCESS.value['code']

    retrieved = [file for file in top_level.rglob('*')]
    assert len(retrieved) == num_files_expected + 3

    logfile = None
    for file in retrieved:
        if str(file).endswith('.log'):
            logfile = file
            break
    assert logfile is not None

    logsize = logfile.stat().st_size
    # successful download, non-verbose logging,
    # should result in zero-size log file
    assert logsize == 0

@pytest.mark.skipif(not RUN_ALL, reason='debug')
def test_copy_attempt_throws_sys_exit_service_error(make_tempdir, settings):
    """ We set profile to dsoc-prod here so as to force the DF
        to try to copy rather than stream
    """

    # N.B. can't do this import with the rest of the imports up top,
    # because test_df_return_codes not yet initialized
    from .test_df_return_codes import we_are_in_docker
    if we_are_in_docker():
        # this test doesn't work in a docker container:
        # locator service URL in capo profile is None,
        # even if we write a fake props file inside the test.
        # Instead, we mock this in test_df_return_codes
        return

    prod_props_filename = prod_profile + '.properties'
    props_file = Path(make_tempdir, prod_props_filename)
        args = ['--product-locator', settings.test_data['product_locator'],
                '--output-dir', str(make_tempdir),
                '--profile', prod_profile]
        namespace = get_arg_parser().parse_args(args)
        fetcher = DataFetcher(namespace, settings.capo_settings)

        servers_report = fetcher.servers_report
        confirm_retrieve_mode_copy(servers_report)

        # let's try just one file so we're not sitting here all day
        for server in servers_report:
            entry = servers_report[server]
            servers_report = {server: entry}
            fetcher.servers_report = servers_report
            assert fetcher.servers_report[server] is not None
            files = fetcher.servers_report[server]['files']
            fetcher.servers_report[server]['files'] = [files[0]]
            break

        with pytest.raises(SystemExit) as exc:
            fetcher.run()
        assert exc.value.code == \
               ReturnCode.CATASTROPHIC_REQUEST_ERROR.value['code']
        if props_file.exists():
            props_file.unlink()
        assert not props_file.exists()

@pytest.mark.skipif(not RUN_ALL, reason='debug')
def test_dies_with_bad_server_info(make_tempdir, settings):
    location_file = get_locations_file('VLA_BAD_SERVER')
    args = ['--profile', TEST_PROFILE, '--output-dir', str(make_tempdir),
            '--location-file', str(location_file)]
        launch_datafetcher(args, settings.capo_settings)
    except Exception as exc:
        assert exc.value.code == ReturnCode.NGAS_FETCH_ERROR['code']


@pytest.mark.skipif(not RUN_ALL, reason='debug')
def test_missing_setting_error_on_bad_destination(settings):
    args = ['--profile', TEST_PROFILE,
            '--product-locator', settings.test_data['product_locator'],
            '--output-dir', 'floob']
        launch_datafetcher(args, settings.capo_settings)
    except Exception as exc:
        assert exc.value.code == ReturnCode.MISSING_SETTING['code']


def write_fake_file(destination: Path, file_info: dict):
    filename = file_info['ngas_file_id']
    path = Path(destination, filename)
    with open(path, 'w') as file:
        file.write(f'{str(file_info["size"])}\n')


class MockSuccessfulFetchReturn:

    @staticmethod
    def run():
        return 0

@pytest.fixture
def mock_successful_fetch_run(monkeypatch):

    def mock_run(*args, **kwargs):
        return MockSuccessfulFetchReturn().run()

    monkeypatch.setattr(DataFetcher, "run", mock_run)


@pytest.mark.skipif(not RUN_ALL, reason='debug')
def test_gets_vlbas_from_report_file(mock_successful_fetch_run,
                                     make_tempdir,
                                     settings):

    location_file = get_locations_file('VLBA_EB')
    args = ['--profile', TEST_PROFILE, '--output-dir', str(make_tempdir),
            '--location-file', str(location_file)]
    fetcher = DataFetcher(get_arg_parser().parse_args(args),
                          settings.capo_settings)
    servers_report = fetcher.servers_report
    assert len(servers_report) == 1

    return_code = fetcher.run()
    assert return_code == 0

    dest_dir = Path(make_tempdir)
    file_info_dict = dict()
    for server in servers_report.items():
        assert server[0] == 'nmngas03.aoc.nrao.edu:7777'
        values = server[1]
        assert values['location'] == Location.DSOC.value
        assert values['cluster'] == Cluster.DSOC.value
        assert values['retrieve_method'] == RetrievalMode.STREAM
        file_values = values['files']
        assert len(file_values) == 16
        for filename in file_values:
            write_fake_file(dest_dir, filename)
            file_info_dict[filename['ngas_file_id']] = filename

    datafetcher = DataFetcher(get_arg_parser().parse_args(args),
                              settings.capo_settings)
    return_code = datafetcher.run()
    assert return_code == 0

    for filename in file_info_dict:
        path = Path(dest_dir, filename)
        assert path.is_file()
        contents = path.read_text().strip()
        assert int(contents) == file_info_dict[filename]['size']

@pytest.mark.skipif(not RUN_ALL, reason='debug')
def test_gets_large_vla_ebs_from_report_file(mock_successful_fetch_run,
                                             make_tempdir,
                                             settings):
    location_file = get_locations_file('VLA_SMALL_EB')
    args = ['--profile', TEST_PROFILE, '--output-dir', str(make_tempdir),
            '--location-file', str(location_file)]
    fetcher = DataFetcher(get_arg_parser().parse_args(args), capo_settings)
    servers_report = fetcher.servers_report
    assert len(servers_report) == 2

    return_code = fetcher.run()
    assert return_code == 0

    server_file_count = {'nmngas03.aoc.nrao.edu:7777': 0,
                         'nmngas04.aoc.nrao.edu:7777': 0}
    dest_dir = Path(make_tempdir)
    file_list = list()
    for server in servers_report.items():
        server_url = server[0]
        assert server_url in server_file_count.keys()
        values = server[1]
        assert values['location'] == Location.DSOC.value
        assert values['cluster'] == Cluster.DSOC.value
        assert values['retrieve_method'] == RetrievalMode.STREAM
        file_values = values['files']
        server_file_count[server_url] += len(file_values)

        for filename in file_values:
            write_fake_file(dest_dir, filename)
            file_list.append(values)

    assert server_file_count['nmngas03.aoc.nrao.edu:7777'] == 3
    assert server_file_count['nmngas04.aoc.nrao.edu:7777'] == 41

    datafetcher = DataFetcher(get_arg_parser().parse_args(args),
                              capo_settings)
    return_code = datafetcher.run()
    assert return_code == 0

    found_count = 0
    for file_info in file_list:
        for file in file_info['files']:
            filename = file['ngas_file_id']
            path = Path(dest_dir, filename)
            assert path.is_file()
            contents = path.read_text().strip()
            assert int(contents) == file['size']
        found_count += 1

    assert found_count == len(file_list)

@pytest.mark.skipif(not RUN_ALL, reason='debug')
def test_gets_images_from_report_file(mock_successful_fetch_run,
                                      make_tempdir,
                                      settings):
    location_file = get_locations_file('IMG')
    args = ['--profile', TEST_PROFILE, '--output-dir', str(make_tempdir),
            '--location-file', str(location_file)]
    fetcher = DataFetcher(get_arg_parser().parse_args(args), capo_settings)
    servers_report = fetcher.servers_report
    assert len(servers_report) == 2

    server_file_count = {'nmngas01.aoc.nrao.edu:7777': 0,
                         'nmngas02.aoc.nrao.edu:7777': 0}
    dest_dir = Path(make_tempdir)
    file_list = list()
    for server in servers_report.items():
        server_url = server[0]
        assert server_url in server_file_count.keys()
        values = server[1]
        assert values['location'] == Location.DSOC.value
        assert values['cluster'] == Cluster.DSOC.value
        assert values['retrieve_method'] == RetrievalMode.STREAM
        file_values = values['files']
        server_file_count[server_url] += len(file_values)

        for filename in file_values:
            write_fake_file(dest_dir, filename)
            file_list.append(values)

    for server_url, count in server_file_count.items():
        assert count == 1

    return_code = fetcher.run()
    assert return_code == 0

    found_count = 0
    for file_info in file_list:
        for file in file_info['files']:
            filename = file['ngas_file_id']
            path = Path(dest_dir, filename)
            assert path.is_file()
            contents = path.read_text().strip()
            assert int(contents) == file['size']
        found_count += 1

    assert found_count == len(file_list)

@pytest.mark.skipif(not RUN_ALL, reason='debug')
def test_gets_calibration_from_report_file(mock_successful_fetch_run,
                                           make_tempdir,
                                           settings):
    location_file = get_locations_file('CALIBRATION')
    args = ['--profile', TEST_PROFILE, '--output-dir', str(make_tempdir),
            '--location-file', str(location_file)]
    fetcher = DataFetcher(get_arg_parser().parse_args(args), capo_settings)
    servers_report = fetcher.servers_report
    assert len(servers_report) == 1

    fake_file = None
    file_info = None
    # (there may be more than one file; thus iteration)
    for server in servers_report.items():
        metadata = server[1]
        destination = Path(make_tempdir)
        file_info = metadata['files'][0]
        fake_file = Path(destination, file_info['ngas_file_id'])
        write_fake_file(destination, file_info)

    assert fake_file.is_file()
    contents = fake_file.read_text().strip()
    assert int(contents) == file_info['size']

@pytest.mark.skipif(not RUN_ALL, reason='debug')
def test_gets_calibration_from_locator(mock_successful_fetch_run,
                                       make_tempdir,
                                       settings):
    external_name = LOCATION_REPORTS['CALIBRATION']['external_name']
    product_locator = ProductLocatorLookup(
        settings.db_settings).look_up_locator_for_ext_name(external_name)
    args = ['--product-locator', product_locator,
            '--output-dir', str(make_tempdir), '--profile', TEST_PROFILE]
    namespace = get_arg_parser().parse_args(args)
    fetch = DataFetcher(namespace, settings.capo_settings)
    report_files = fetch.locations_report.files_report['files']
    assert len(report_files) == 1

    file_spec = report_files[0]
    relative_path = file_spec['relative_path']
    assert external_name == relative_path
    assert relative_path == file_spec['subdirectory']
    destination = Path(make_tempdir) / relative_path
    destination.mkdir()
    write_fake_file(destination, file_spec)

    fake_file = Path(destination, file_spec['ngas_file_id'])
    assert fake_file.is_file()
    contents = fake_file.read_text().strip()
    assert int(contents) == file_spec['size']

def test_gets_gbt_data_from_locator(make_tempdir, settings):
    """ Can we cope with GBT data? """

    external_name = 'AGBT17B_044_553492'
    product_locator = ProductLocatorLookup(settings.db_settings)\
        .look_up_locator_for_ext_name(external_name)
    args = ['--product-locator', product_locator,
            '--output-dir', str(make_tempdir), '--profile', TEST_PROFILE]
    namespace = get_arg_parser().parse_args(args)
    fetch = DataFetcher(namespace, settings.capo_settings)
    report_files = fetch.locations_report.files_report['files']
    assert len(report_files) == 1

    file_spec = report_files[0]
    relative_path = file_spec['relative_path']
    assert relative_path == 'AGBT17B_044_01.tar'
    destination = Path(make_tempdir) / relative_path
    destination.mkdir()
    write_fake_file(destination, file_spec)

    fake_file = Path(destination, file_spec['ngas_file_id'])
    assert fake_file.is_file()
    contents = fake_file.read_text().strip()
    assert int(contents) == file_spec['size']