Skip to content
Snippets Groups Projects
df_pytest_utils.py 10.12 KiB
#!/usr/bin/python
# -*- coding: utf-8 -*-

""" Various conveniences for use and re-use in test cases """

import json
import logging
import os
import sys
import tempfile
from pathlib import Path

sys.path.insert(0, str(Path(".").absolute()))
sys.path.insert(0, str(Path("..").absolute()))


# TODO: Some Fine Day: this duplicates same function in package tester.
#  CAVEAT PROGRAMMOR: attempts to centralize it have resulted in tears.
def get_project_root() -> Path:
    """
    Get the root of this project.

    :return:
    """
    my_path = Path(__file__)
    path = my_path
    while not path.name.endswith("workspaces") and not path.name.endswith("packages"):
        path = path.parent

    return path


import pytest

from pycapo import CapoConfig

# pylint: disable=C0115, C0116, C0200, R0902, R0903, R0914, R1721, W0212, W0613, W0621, W0703, W1203
sys.path.insert(0, str(get_project_root()))
from shared.workspaces.test.test_data.utilities import (
    get_locations_report,
    get_test_data_dir,
)

from datafetcher.datafetcher import DataFetcher
from datafetcher.return_codes import ReturnCode
from datafetcher.errors import MissingSettingsException, NoProfileException
from datafetcher.locations_report import LocationsReport
from datafetcher.utilities import (
    REQUIRED_SETTINGS,
    get_arg_parser,
    ExecutionSite,
    ProductLocatorLookup,
    RetrievalMode,
)

TEST_PROFILE = "docker"
MISSING_SETTING = ReturnCode.MISSING_SETTING.value["code"]
MISSING_PROFILE = ReturnCode.MISSING_PROFILE.value["code"]
RUN_ALL = True

LOCATION_REPORTS = {
    "VLA_SMALL_EB": {
        "filename": "VLA_SMALL_EB.json",
        "external_name": "sysstartS.58955.83384832176",
        "file_count": 44,
        "server_count": 2,
    },
    "VLA_LARGE_EB": {
        "filename": "VLA_LARGE_EB.json",
        "external_name": "17B-197.sb34812522.eb35115211.58168.58572621528",
        "file_count": 46,
        "server_count": 2,
    },
    "VLA_BAD_SERVER": {
        "filename": "VLA_BAD_SERVER.json",
        "external_name": "TSKY_20min_B2319_18ms_001.58955.86469591435",
        "file_count": 1,
        "server_count": 1,
    },
    "IMG": {
        "filename": "IMG.json",
        "external_name": "VLASS1.1.ql.T01t01.J000232-383000.10.2048.v1",
        "file_count": 2,
        "server_count": 2,
    },
    "VLBA_EB": {
        "filename": "VLBA_EB.json",
        "external_name": "",
        "file_count": 16,
        "server_count": 1,
    },
    "CALIBRATION": {
        "filename": "CALIBRATION.json",
        "external_name": "18B-265_2019_12_10_T00_00_59.203.tar",
        "file_count": 1,
        "server_count": 1,
    },
}


def get_locations_file(key: str):
    """
    Return location report file specified by key
    :param key: location report name
    :return:

    """

    report_spec = LOCATION_REPORTS[key]
    filename = report_spec["filename"]
    return Path(get_test_data_dir(), filename)


def write_locations_file(destination: Path, locations_report: LocationsReport):
    """

    :param destination: where locations file is to be written
    :param locations_report: locations report from which we'll write the file

    :return:

    """

    with open(destination, "w") as to_write:
        to_dump = {"files": locations_report["files"]}
        json.dump(to_dump, to_write, indent=4)
    return destination


def get_mini_exec_block():
    """
    Returns a location report with large files excised

    :return: attenuated location report

    """

    locations_in = get_locations_report("VLA_SMALL_EB")
    locations_out = locations_in.copy()
    locations_out["files"] = [
        file for file in locations_in["files"] if file["size"] <= 100000
    ]
    return locations_out


def get_mini_locations_file(destination):
    """
    Returns a location report file with large files excised

    :return: downsized location report file

    """

    locations_report = get_mini_exec_block()
    with open(destination, "w") as to_write:
        to_dump = {"files": locations_report["files"]}
        json.dump(to_dump, to_write, indent=4)
    return destination


def get_filenames_for_locator(product_locator: str, settings: dict):
    """
    For a given product locators, return names of all the files
    in its locations report's files report

    :param product_locator:
    :param settings:
    :return:
    """

    args = [
        "--product-locator",
        product_locator,
        "--profile",
        TEST_PROFILE,
        "--output-dir",
        None,
    ]
    namespace = get_arg_parser().parse_args(args)
    locations_report = LocationsReport(None, namespace, settings)

    return [file["relative_path"] for file in locations_report.files_report["files"]]


def find_newest_fetch_log_file(target_dir: Path):
    """
    Data-fetcher command line was executed, perhaps more than once;
    find the most recent log

    :param target_dir: location of log file(s)
    :return:
    """

    logfiles = list()
    for root, _, filenames in os.walk(target_dir):
        for filename in filenames:
            if filename.startswith("DataFetcher") and filename.endswith(".log"):
                logfiles.append(Path(root, filename))

    if logfiles:
        return max(logfiles, key=os.path.getctime)

    return None


def get_test_capo_settings():
    """ get the capo settings we'll need for the tests """
    capo = CapoConfig(profile=TEST_PROFILE)
    result = dict()
    for setting in REQUIRED_SETTINGS:
        setting = setting.upper()
        try:
            result[REQUIRED_SETTINGS[setting]] = capo[setting]
        except KeyError as k_err:
            raise MissingSettingsException(
                'missing required setting "{}"'.format(setting)
            ) from k_err

    if result is None or len(result) == 0:
        raise MissingSettingsException("Required Capo settings were not found")

    # be sure execution site is not DSOC nor NAASC
    exec_site = result["execution_site"]
    if ExecutionSite.DSOC.value in exec_site or ExecutionSite.NAASC.value in exec_site:
        result["execution_site"] = "local_test"

    # be sure download location is accessible
    dl_loc = result["download_dir"]
    if not Path("/lustre").is_dir() and "/lustre" in dl_loc:
        result["download_dir"] = "/var/tmp/"

    return result


def get_metadata_db_settings(profile):
    """Get Capo settings needed to connect to archive DB
    :param profile:
    :return:
    """
    result = dict()
    if profile is None:
        raise NoProfileException("CAPO_PROFILE required; none provided")
    capo = CapoConfig(profile=TEST_PROFILE)
    fields = ["jdbcDriver", "jdbcUrl", "jdbcUsername", "jdbcPassword"]
    qualified_fields = ["metadataDatabase." + field for field in fields]
    for field in qualified_fields:
        try:
            result[field] = capo.get(field)
        except KeyError as k_err:
            raise MissingSettingsException(
                f'missing required setting "{field}"'
            ) from k_err
    return result


@pytest.fixture(autouse=True, scope="function")
def make_tempdir() -> Path:
    """
    Creates a new temporary working directory for each test.

    :return:
    """
    umask = os.umask(0o000)
    top_level = tempfile.mkdtemp(prefix="datafetcher_test_", dir="/var/tmp")
    os.umask(umask)
    yield top_level


@pytest.fixture(scope="session")
def capo_settings():
    """
    Gets Capo settings once for whole module.
    :return:
    """

    def retrieve_capo_settings() -> CapoConfig:
        return get_test_capo_settings()

    to_return = retrieve_capo_settings()
    yield to_return


@pytest.fixture(scope="session")
def settings(capo_settings):
    """
    Grabs all the settings we will need for the datafetcher:
        Capo, database, test data

    :param capo_settings:
    :return:
    """
    """ g
    """
    db_settings = get_metadata_db_settings(TEST_PROFILE)
    test_data = _initialize_test_data(db_settings)
    yield Settings(capo_settings, db_settings, test_data)


def _initialize_test_data(db_settings):
    """
    Set up test data for use in several tests

    :param db_settings:
    :return:
    """

    ext_name = "13B-014.sb28862036.eb29155786.56782.5720116088"

    product_locator = ProductLocatorLookup(db_settings).look_up_locator_for_ext_name(
        ext_name
    )
    return {"external_name": ext_name, "product_locator": product_locator}


class Settings:
    """ Encapsulates some settings for use in tests """

    def __init__(self, capo_settings, db_settings, test_data):
        self.capo_settings = capo_settings
        self.db_settings = db_settings
        self.test_data = test_data


def launch_datafetcher(args: list, df_capo_settings: dict) -> int:
    """invoke the DF with these args as in df.main(),
    launch it with df.run(),
    and return the appropriate return/error code

    """
    if args is None or len(args) == 0:
        return MISSING_SETTING

    try:
        namespace = evaluate_args_and_capo(args, df_capo_settings)
        fetcher = DataFetcher(namespace, df_capo_settings)
        return fetcher.run()
    except SystemExit as exc:
        if hasattr(exc, "value"):
            return exc.value.code if hasattr(exc.value, "code") else exc.value
        if hasattr(exc, "code"):
            return exc.code

        raise
    except (KeyError, NoProfileException) as exc:
        logging.error(f"{exc}")
        return MISSING_PROFILE
    except Exception as exc:
        pytest.fail(f"{exc}")


def evaluate_args_and_capo(args: list, capo_settings: dict):

    if args is None or len(args) == 0:
        sys.exit(MISSING_SETTING)

    profile = get_profile_from_args(args)
    if profile is None:
        profile = capo_settings["profile"]
        if profile is None:
            sys.exit(MISSING_PROFILE)
        else:
            args["profile"] = profile

    namespace = get_arg_parser().parse_args(args)
    return namespace


def get_profile_from_args(args: list) -> str:
    for i in range(0, len(args)):
        if args[i] == "--profile" and i < len(args) - 1:
            profile = args[i + 1]
            return profile

    return ""


def confirm_retrieve_mode_copy(servers_report: dict) -> None:
    for server in servers_report:
        entry = servers_report[server]
        assert entry["retrieve_method"].value == RetrievalMode.COPY.value