-
Andrew Kapuscinski authoredAndrew Kapuscinski authored
df_pytest_utils.py 10.12 KiB
#!/usr/bin/python
# -*- coding: utf-8 -*-
""" Various conveniences for use and re-use in test cases """
import json
import logging
import os
import sys
import tempfile
from pathlib import Path
sys.path.insert(0, str(Path(".").absolute()))
sys.path.insert(0, str(Path("..").absolute()))
# TODO: Some Fine Day: this duplicates same function in package tester.
# CAVEAT PROGRAMMOR: attempts to centralize it have resulted in tears.
def get_project_root() -> Path:
"""
Get the root of this project.
:return:
"""
my_path = Path(__file__)
path = my_path
while not path.name.endswith("workspaces") and not path.name.endswith("packages"):
path = path.parent
return path
import pytest
from pycapo import CapoConfig
# pylint: disable=C0115, C0116, C0200, R0902, R0903, R0914, R1721, W0212, W0613, W0621, W0703, W1203
sys.path.insert(0, str(get_project_root()))
from shared.workspaces.test.test_data.utilities import (
get_locations_report,
get_test_data_dir,
)
from datafetcher.datafetcher import DataFetcher
from datafetcher.return_codes import ReturnCode
from datafetcher.errors import MissingSettingsException, NoProfileException
from datafetcher.locations_report import LocationsReport
from datafetcher.utilities import (
REQUIRED_SETTINGS,
get_arg_parser,
ExecutionSite,
ProductLocatorLookup,
RetrievalMode,
)
TEST_PROFILE = "docker"
MISSING_SETTING = ReturnCode.MISSING_SETTING.value["code"]
MISSING_PROFILE = ReturnCode.MISSING_PROFILE.value["code"]
RUN_ALL = True
LOCATION_REPORTS = {
"VLA_SMALL_EB": {
"filename": "VLA_SMALL_EB.json",
"external_name": "sysstartS.58955.83384832176",
"file_count": 44,
"server_count": 2,
},
"VLA_LARGE_EB": {
"filename": "VLA_LARGE_EB.json",
"external_name": "17B-197.sb34812522.eb35115211.58168.58572621528",
"file_count": 46,
"server_count": 2,
},
"VLA_BAD_SERVER": {
"filename": "VLA_BAD_SERVER.json",
"external_name": "TSKY_20min_B2319_18ms_001.58955.86469591435",
"file_count": 1,
"server_count": 1,
},
"IMG": {
"filename": "IMG.json",
"external_name": "VLASS1.1.ql.T01t01.J000232-383000.10.2048.v1",
"file_count": 2,
"server_count": 2,
},
"VLBA_EB": {
"filename": "VLBA_EB.json",
"external_name": "",
"file_count": 16,
"server_count": 1,
},
"CALIBRATION": {
"filename": "CALIBRATION.json",
"external_name": "18B-265_2019_12_10_T00_00_59.203.tar",
"file_count": 1,
"server_count": 1,
},
}
def get_locations_file(key: str):
"""
Return location report file specified by key
:param key: location report name
:return:
"""
report_spec = LOCATION_REPORTS[key]
filename = report_spec["filename"]
return Path(get_test_data_dir(), filename)
def write_locations_file(destination: Path, locations_report: LocationsReport):
"""
:param destination: where locations file is to be written
:param locations_report: locations report from which we'll write the file
:return:
"""
with open(destination, "w") as to_write:
to_dump = {"files": locations_report["files"]}
json.dump(to_dump, to_write, indent=4)
return destination
def get_mini_exec_block():
"""
Returns a location report with large files excised
:return: attenuated location report
"""
locations_in = get_locations_report("VLA_SMALL_EB")
locations_out = locations_in.copy()
locations_out["files"] = [
file for file in locations_in["files"] if file["size"] <= 100000
]
return locations_out
def get_mini_locations_file(destination):
"""
Returns a location report file with large files excised
:return: downsized location report file
"""
locations_report = get_mini_exec_block()
with open(destination, "w") as to_write:
to_dump = {"files": locations_report["files"]}
json.dump(to_dump, to_write, indent=4)
return destination
def get_filenames_for_locator(product_locator: str, settings: dict):
"""
For a given product locators, return names of all the files
in its locations report's files report
:param product_locator:
:param settings:
:return:
"""
args = [
"--product-locator",
product_locator,
"--profile",
TEST_PROFILE,
"--output-dir",
None,
]
namespace = get_arg_parser().parse_args(args)
locations_report = LocationsReport(None, namespace, settings)
return [file["relative_path"] for file in locations_report.files_report["files"]]
def find_newest_fetch_log_file(target_dir: Path):
"""
Data-fetcher command line was executed, perhaps more than once;
find the most recent log
:param target_dir: location of log file(s)
:return:
"""
logfiles = list()
for root, _, filenames in os.walk(target_dir):
for filename in filenames:
if filename.startswith("DataFetcher") and filename.endswith(".log"):
logfiles.append(Path(root, filename))
if logfiles:
return max(logfiles, key=os.path.getctime)
return None
def get_test_capo_settings():
""" get the capo settings we'll need for the tests """
capo = CapoConfig(profile=TEST_PROFILE)
result = dict()
for setting in REQUIRED_SETTINGS:
setting = setting.upper()
try:
result[REQUIRED_SETTINGS[setting]] = capo[setting]
except KeyError as k_err:
raise MissingSettingsException(
'missing required setting "{}"'.format(setting)
) from k_err
if result is None or len(result) == 0:
raise MissingSettingsException("Required Capo settings were not found")
# be sure execution site is not DSOC nor NAASC
exec_site = result["execution_site"]
if ExecutionSite.DSOC.value in exec_site or ExecutionSite.NAASC.value in exec_site:
result["execution_site"] = "local_test"
# be sure download location is accessible
dl_loc = result["download_dir"]
if not Path("/lustre").is_dir() and "/lustre" in dl_loc:
result["download_dir"] = "/var/tmp/"
return result
def get_metadata_db_settings(profile):
"""Get Capo settings needed to connect to archive DB
:param profile:
:return:
"""
result = dict()
if profile is None:
raise NoProfileException("CAPO_PROFILE required; none provided")
capo = CapoConfig(profile=TEST_PROFILE)
fields = ["jdbcDriver", "jdbcUrl", "jdbcUsername", "jdbcPassword"]
qualified_fields = ["metadataDatabase." + field for field in fields]
for field in qualified_fields:
try:
result[field] = capo.get(field)
except KeyError as k_err:
raise MissingSettingsException(
f'missing required setting "{field}"'
) from k_err
return result
@pytest.fixture(autouse=True, scope="function")
def make_tempdir() -> Path:
"""
Creates a new temporary working directory for each test.
:return:
"""
umask = os.umask(0o000)
top_level = tempfile.mkdtemp(prefix="datafetcher_test_", dir="/var/tmp")
os.umask(umask)
yield top_level
@pytest.fixture(scope="session")
def capo_settings():
"""
Gets Capo settings once for whole module.
:return:
"""
def retrieve_capo_settings() -> CapoConfig:
return get_test_capo_settings()
to_return = retrieve_capo_settings()
yield to_return
@pytest.fixture(scope="session")
def settings(capo_settings):
"""
Grabs all the settings we will need for the datafetcher:
Capo, database, test data
:param capo_settings:
:return:
"""
""" g
"""
db_settings = get_metadata_db_settings(TEST_PROFILE)
test_data = _initialize_test_data(db_settings)
yield Settings(capo_settings, db_settings, test_data)
def _initialize_test_data(db_settings):
"""
Set up test data for use in several tests
:param db_settings:
:return:
"""
ext_name = "13B-014.sb28862036.eb29155786.56782.5720116088"
product_locator = ProductLocatorLookup(db_settings).look_up_locator_for_ext_name(
ext_name
)
return {"external_name": ext_name, "product_locator": product_locator}
class Settings:
""" Encapsulates some settings for use in tests """
def __init__(self, capo_settings, db_settings, test_data):
self.capo_settings = capo_settings
self.db_settings = db_settings
self.test_data = test_data
def launch_datafetcher(args: list, df_capo_settings: dict) -> int:
"""invoke the DF with these args as in df.main(),
launch it with df.run(),
and return the appropriate return/error code
"""
if args is None or len(args) == 0:
return MISSING_SETTING
try:
namespace = evaluate_args_and_capo(args, df_capo_settings)
fetcher = DataFetcher(namespace, df_capo_settings)
return fetcher.run()
except SystemExit as exc:
if hasattr(exc, "value"):
return exc.value.code if hasattr(exc.value, "code") else exc.value
if hasattr(exc, "code"):
return exc.code
raise
except (KeyError, NoProfileException) as exc:
logging.error(f"{exc}")
return MISSING_PROFILE
except Exception as exc:
pytest.fail(f"{exc}")
def evaluate_args_and_capo(args: list, capo_settings: dict):
if args is None or len(args) == 0:
sys.exit(MISSING_SETTING)
profile = get_profile_from_args(args)
if profile is None:
profile = capo_settings["profile"]
if profile is None:
sys.exit(MISSING_PROFILE)
else:
args["profile"] = profile
namespace = get_arg_parser().parse_args(args)
return namespace
def get_profile_from_args(args: list) -> str:
for i in range(0, len(args)):
if args[i] == "--profile" and i < len(args) - 1:
profile = args[i + 1]
return profile
return ""
def confirm_retrieve_mode_copy(servers_report: dict) -> None:
for server in servers_report:
entry = servers_report[server]
assert entry["retrieve_method"].value == RetrievalMode.COPY.value