Skip to content
Snippets Groups Projects
df_pytest_utils.py 10.1 KiB
Newer Older
Janet Goldstein's avatar
Janet Goldstein committed
""" Various conveniences for use and re-use in test cases """

import json
Janet Goldstein's avatar
Janet Goldstein committed
import os
Janet Goldstein's avatar
Janet Goldstein committed
from pathlib import Path

sys.path.insert(0, str(Path('.').absolute()))
sys.path.insert(0, str(Path('..').absolute()))


# TODO: Some Fine Day: this duplicates same function in package tester.
#  CAVEAT PROGRAMMOR: attempts to centralize it have resulted in tears.
def get_project_root() -> Path:
    """
    Get the root of this project.

    :return:
    """
    my_path = Path(__file__)
    path = my_path
    while not path.name.endswith('data') and not path.name.endswith('code'):
        path = path.parent

    return path

# pylint: disable=C0115, C0116, C0200, R0902, R0903, R0914, R1721, W0212, W0613, W0621, W0703, W1203
sys.path.insert(0, str(get_project_root()))
from shared.workspaces.test.utilities import get_locations_report, \
    get_test_data_dir

from datafetcher.datafetcher import DataFetcher
from datafetcher.return_codes import ReturnCode
from datafetcher.errors import MissingSettingsException, NoProfileException
from datafetcher.locations_report import LocationsReport
from datafetcher.utilities import REQUIRED_SETTINGS, get_arg_parser, \
    ExecutionSite, ProductLocatorLookup, RetrievalMode
MISSING_SETTING = ReturnCode.MISSING_SETTING.value['code']
MISSING_PROFILE = ReturnCode.MISSING_PROFILE.value['code']
RUN_ALL = True
Janet Goldstein's avatar
Janet Goldstein committed

LOCATION_REPORTS = {
    'VLA_SMALL_EB': {
        'filename': 'VLA_SMALL_EB.json',
        'external_name': 'sysstartS.58955.83384832176',
        'file_count': 44,
        'server_count': 2
Janet Goldstein's avatar
Janet Goldstein committed
    },
    'VLA_LARGE_EB': {
        'filename': 'VLA_LARGE_EB.json',
        'external_name': '17B-197.sb34812522.eb35115211.58168.58572621528',
        'file_count': 46,
        'server_count': 2
Janet Goldstein's avatar
Janet Goldstein committed
    },
    'VLA_BAD_SERVER': {
        'filename': 'VLA_BAD_SERVER.json',
        'external_name': 'TSKY_20min_B2319_18ms_001.58955.86469591435',
        'file_count': 1,
        'server_count': 1
Janet Goldstein's avatar
Janet Goldstein committed
    },
    'IMG': {
            'VLASS1.1.ql.T01t01.J000232-383000.10.2048.v1',
Janet Goldstein's avatar
Janet Goldstein committed
    },
    'VLBA_EB': {
        'filename': 'VLBA_EB.json',
        'external_name': '',
        'file_count': 16,
        'server_count': 1
Janet Goldstein's avatar
Janet Goldstein committed
    },
    'CALIBRATION': {
        'filename': 'CALIBRATION.json',
        'external_name': '18B-265_2019_12_10_T00_00_59.203.tar',
        'file_count': 1,
        'server_count': 1
Janet Goldstein's avatar
Janet Goldstein committed
    },

}


def get_locations_file(key: str):
    """
    Return location report file specified by key
    :param key: location report name
    :return:

    """

Janet Goldstein's avatar
Janet Goldstein committed
    report_spec = LOCATION_REPORTS[key]
    filename = report_spec['filename']
Janet Goldstein's avatar
Janet Goldstein committed


def write_locations_file(destination: Path, locations_report: LocationsReport):
    """

    :param destination: where locations file is to be written
    :param locations_report: locations report from which we'll write the file

    :return:

    """

Janet Goldstein's avatar
Janet Goldstein committed
    with open(destination, 'w') as to_write:
        to_dump = {'files': locations_report['files']}
        json.dump(to_dump, to_write, indent=4)
    return destination

Janet Goldstein's avatar
Janet Goldstein committed
def get_mini_exec_block():
    """
    Returns a location report with large files excised

    :return: attenuated location report

    """

Janet Goldstein's avatar
Janet Goldstein committed
    locations_in = get_locations_report('VLA_SMALL_EB')
    locations_out = locations_in.copy()
    locations_out['files'] = \
        [file for file in locations_in['files'] if file['size'] <= 100000]
    return locations_out

Janet Goldstein's avatar
Janet Goldstein committed
def get_mini_locations_file(destination):
    """
    Returns a location report file with large files excised

    :return: downsized location report file

    """

Janet Goldstein's avatar
Janet Goldstein committed
    locations_report = get_mini_exec_block()
    with open(destination, 'w') as to_write:
        to_dump = {'files': locations_report['files']}
        json.dump(to_dump, to_write, indent=4)
    return destination

def get_filenames_for_locator(product_locator: str, settings: dict):
Janet Goldstein's avatar
Janet Goldstein committed
    For a given product locators, return names of all the files
    in its locations report's files report
Janet Goldstein's avatar
Janet Goldstein committed
    :param product_locator:
    :param settings:
    :return:
Janet Goldstein's avatar
Janet Goldstein committed
    args = ['--product-locator', product_locator,
Janet Goldstein's avatar
Janet Goldstein committed
    namespace = get_arg_parser().parse_args(args)
    locations_report = LocationsReport(None, namespace, settings)
Janet Goldstein's avatar
Janet Goldstein committed

    return [file['relative_path'] for file in
            locations_report.files_report['files']]

def find_newest_fetch_log_file(target_dir: Path):
    """
    Data-fetcher command line was executed, perhaps more than once;

    :param target_dir: location of log file(s)
    :return:
    """
Janet Goldstein's avatar
Janet Goldstein committed
        for filename in filenames:
            if filename.startswith('DataFetcher') \

    if logfiles:
        return max(logfiles, key=os.path.getctime)

    return None
def get_test_capo_settings():
    """ get the capo settings we'll need for the tests """
    capo = CapoConfig(profile=TEST_PROFILE)
    result = dict()
    for setting in REQUIRED_SETTINGS:
        setting = setting.upper()
        try:
            result[REQUIRED_SETTINGS[setting]] = capo[setting]
            raise MissingSettingsException('missing required setting "{}"'
    if result is None or len(result) == 0:
        raise MissingSettingsException('Required Capo settings were not found')
    # be sure execution site is not DSOC nor NAASC
    exec_site = result['execution_site']
    if ExecutionSite.DSOC.value in exec_site or ExecutionSite.NAASC.value in \
            exec_site:
        result['execution_site'] = 'local_test'

    # be sure download location is accessible
    dl_loc = result['download_dir']
    if not Path('/lustre').is_dir() and '/lustre' in dl_loc:
        result['download_dir'] = '/var/tmp/'

    return result

def get_metadata_db_settings(profile):
    """ Get Capo settings needed to connect to archive DB
    """
    result = dict()
    if profile is None:
        raise NoProfileException('CAPO_PROFILE required; none provided')
    capo = CapoConfig(profile=TEST_PROFILE)
    fields = ['jdbcDriver', 'jdbcUrl', 'jdbcUsername', 'jdbcPassword']
    qualified_fields = ['metadataDatabase.' + field for field in fields]
    for field in qualified_fields:
        try:
            raise MissingSettingsException(
                f'missing required setting "{field}"') from k_err
def make_tempdir() -> Path:
    """
    Creates a new temporary working directory for each test.

    :return:
    """
    umask = os.umask(0o000)
    top_level = tempfile.mkdtemp(prefix='datafetcher_test_', dir='/var/tmp')
    os.umask(umask)
    yield top_level


@pytest.fixture(scope='session')
def capo_settings():
    """
    Gets Capo settings once for whole module.
    :return:
    """

    def retrieve_capo_settings() -> CapoConfig:
        return get_test_capo_settings()

    to_return = retrieve_capo_settings()
    yield to_return


@pytest.fixture(scope='session')
def settings(capo_settings):
    """
    Grabs all the settings we will need for the datafetcher:
    '''
    db_settings = get_metadata_db_settings(TEST_PROFILE)
    test_data = _initialize_test_data(db_settings)
    yield Settings(capo_settings, db_settings, test_data)


def _initialize_test_data(db_settings):
    """
    Set up test data for use in several tests

    :param db_settings:
    :return:
    """

    ext_name = '13B-014.sb28862036.eb29155786.56782.5720116088'

    product_locator = ProductLocatorLookup(db_settings) \
        .look_up_locator_for_ext_name(ext_name)
    return {'external_name': ext_name,
            'product_locator': product_locator}


class Settings:
    """ Encapsulates some settings for use in tests """

    def __init__(self, capo_settings, db_settings, test_data):
        self.capo_settings = capo_settings
        self.db_settings = db_settings
        self.test_data = test_data


def launch_datafetcher(args: list, df_capo_settings: dict) -> int:
    """ invoke the DF with these args as in df.main(),
        launch it with df.run(),
        and return the appropriate return/error code

    """
    if args is None or len(args) == 0:
        return MISSING_SETTING

    try:
        namespace = evaluate_args_and_capo(args, df_capo_settings)
        fetcher = DataFetcher(namespace, df_capo_settings)
        return fetcher.run()
    except SystemExit as exc:
        if hasattr(exc, 'value'):
            return exc.value.code if hasattr(exc.value, 'code') else exc.value
        if hasattr(exc, 'code'):
            return exc.code

        raise
    except (KeyError, NoProfileException) as exc:
        logging.error(f'{exc}')
        return MISSING_PROFILE
    except Exception as exc:
        pytest.fail(f'{exc}')


def evaluate_args_and_capo(args: list, capo_settings: dict):

    if args is None or len(args) == 0:
        sys.exit(MISSING_SETTING)

    profile = get_profile_from_args(args)
    if profile is None:
        profile = capo_settings['profile']
        if profile is None:
            sys.exit(MISSING_PROFILE)
        else:
            args['profile'] = profile

    namespace = get_arg_parser().parse_args(args)
    return namespace


def get_profile_from_args(args: list) -> str:
    for i in range(0, len(args)):
        if args[i] == '--profile' and i < len(args) - 1:
            profile = args[i + 1]
            return profile

    return ''


def confirm_retrieve_mode_copy(servers_report: dict) -> None:
    for server in servers_report:
        entry = servers_report[server]
        assert entry['retrieve_method'].value == RetrievalMode.COPY.value