#!/usr/bin/python # -*- coding: utf-8 -*- """ Various conveniences for use and re-use in test cases """ import json import logging import os import sys import tempfile from pathlib import Path sys.path.insert(0, str(Path(".").absolute())) sys.path.insert(0, str(Path("..").absolute())) # TODO: Some Fine Day: this duplicates same function in package tester. # CAVEAT PROGRAMMOR: attempts to centralize it have resulted in tears. def get_project_root() -> Path: """ Get the root of this project. :return: """ my_path = Path(__file__) path = my_path while not path.name.endswith("workspaces") and not path.name.endswith("packages"): path = path.parent return path import pytest from pycapo import CapoConfig # pylint: disable=C0115, C0116, C0200, R0902, R0903, R0914, R1721, W0212, W0613, W0621, W0703, W1203 sys.path.insert(0, str(get_project_root())) from shared.workspaces.test.test_data.utilities import ( get_locations_report, get_test_data_dir, ) from datafetcher.datafetcher import DataFetcher from datafetcher.return_codes import ReturnCode from datafetcher.errors import MissingSettingsException, NoProfileException from datafetcher.locations_report import LocationsReport from datafetcher.utilities import ( REQUIRED_SETTINGS, get_arg_parser, ExecutionSite, ProductLocatorLookup, RetrievalMode, ) TEST_PROFILE = "docker" MISSING_SETTING = ReturnCode.MISSING_SETTING.value["code"] MISSING_PROFILE = ReturnCode.MISSING_PROFILE.value["code"] RUN_ALL = True LOCATION_REPORTS = { "VLA_SMALL_EB": { "filename": "VLA_SMALL_EB.json", "external_name": "sysstartS.58955.83384832176", "file_count": 44, "server_count": 2, }, "VLA_LARGE_EB": { "filename": "VLA_LARGE_EB.json", "external_name": "17B-197.sb34812522.eb35115211.58168.58572621528", "file_count": 46, "server_count": 2, }, "VLA_BAD_SERVER": { "filename": "VLA_BAD_SERVER.json", "external_name": "TSKY_20min_B2319_18ms_001.58955.86469591435", "file_count": 1, "server_count": 1, }, "IMG": { "filename": "IMG.json", "external_name": "VLASS1.1.ql.T01t01.J000232-383000.10.2048.v1", "file_count": 2, "server_count": 2, }, "VLBA_EB": { "filename": "VLBA_EB.json", "external_name": "", "file_count": 16, "server_count": 1, }, "CALIBRATION": { "filename": "CALIBRATION.json", "external_name": "18B-265_2019_12_10_T00_00_59.203.tar", "file_count": 1, "server_count": 1, }, } def get_locations_file(key: str): """ Return location report file specified by key :param key: location report name :return: """ report_spec = LOCATION_REPORTS[key] filename = report_spec["filename"] return Path(get_test_data_dir(), filename) def write_locations_file(destination: Path, locations_report: LocationsReport): """ :param destination: where locations file is to be written :param locations_report: locations report from which we'll write the file :return: """ with open(destination, "w") as to_write: to_dump = {"files": locations_report["files"]} json.dump(to_dump, to_write, indent=4) return destination def get_mini_exec_block(): """ Returns a location report with large files excised :return: attenuated location report """ locations_in = get_locations_report("VLA_SMALL_EB") locations_out = locations_in.copy() locations_out["files"] = [ file for file in locations_in["files"] if file["size"] <= 100000 ] return locations_out def get_mini_locations_file(destination): """ Returns a location report file with large files excised :return: downsized location report file """ locations_report = get_mini_exec_block() with open(destination, "w") as to_write: to_dump = {"files": locations_report["files"]} json.dump(to_dump, to_write, indent=4) return destination def get_filenames_for_locator(product_locator: str, settings: dict): """ For a given product locators, return names of all the files in its locations report's files report :param product_locator: :param settings: :return: """ args = [ "--product-locator", product_locator, "--profile", TEST_PROFILE, "--output-dir", None, ] namespace = get_arg_parser().parse_args(args) locations_report = LocationsReport(None, namespace, settings) return [file["relative_path"] for file in locations_report.files_report["files"]] def find_newest_fetch_log_file(target_dir: Path): """ Data-fetcher command line was executed, perhaps more than once; find the most recent log :param target_dir: location of log file(s) :return: """ logfiles = list() for root, _, filenames in os.walk(target_dir): for filename in filenames: if filename.startswith("DataFetcher") and filename.endswith(".log"): logfiles.append(Path(root, filename)) if logfiles: return max(logfiles, key=os.path.getctime) return None def get_test_capo_settings(): """ get the capo settings we'll need for the tests """ capo = CapoConfig(profile=TEST_PROFILE) result = dict() for setting in REQUIRED_SETTINGS: setting = setting.upper() try: result[REQUIRED_SETTINGS[setting]] = capo[setting] except KeyError as k_err: raise MissingSettingsException( 'missing required setting "{}"'.format(setting) ) from k_err if result is None or len(result) == 0: raise MissingSettingsException("Required Capo settings were not found") # be sure execution site is not DSOC nor NAASC exec_site = result["execution_site"] if ExecutionSite.DSOC.value in exec_site or ExecutionSite.NAASC.value in exec_site: result["execution_site"] = "local_test" # be sure download location is accessible dl_loc = result["download_dir"] if not Path("/lustre").is_dir() and "/lustre" in dl_loc: result["download_dir"] = "/var/tmp/" return result def get_metadata_db_settings(profile): """Get Capo settings needed to connect to archive DB :param profile: :return: """ result = dict() if profile is None: raise NoProfileException("CAPO_PROFILE required; none provided") capo = CapoConfig(profile=TEST_PROFILE) fields = ["jdbcDriver", "jdbcUrl", "jdbcUsername", "jdbcPassword"] qualified_fields = ["metadataDatabase." + field for field in fields] for field in qualified_fields: try: result[field] = capo.get(field) except KeyError as k_err: raise MissingSettingsException( f'missing required setting "{field}"' ) from k_err return result @pytest.fixture(autouse=True, scope="function") def make_tempdir() -> Path: """ Creates a new temporary working directory for each test. :return: """ umask = os.umask(0o000) top_level = tempfile.mkdtemp(prefix="datafetcher_test_", dir="/var/tmp") os.umask(umask) yield top_level @pytest.fixture(scope="session") def capo_settings(): """ Gets Capo settings once for whole module. :return: """ def retrieve_capo_settings() -> CapoConfig: return get_test_capo_settings() to_return = retrieve_capo_settings() yield to_return @pytest.fixture(scope="session") def settings(capo_settings): """ Grabs all the settings we will need for the datafetcher: Capo, database, test data :param capo_settings: :return: """ """ g """ db_settings = get_metadata_db_settings(TEST_PROFILE) test_data = _initialize_test_data(db_settings) yield Settings(capo_settings, db_settings, test_data) def _initialize_test_data(db_settings): """ Set up test data for use in several tests :param db_settings: :return: """ ext_name = "13B-014.sb28862036.eb29155786.56782.5720116088" product_locator = ProductLocatorLookup(db_settings).look_up_locator_for_ext_name( ext_name ) return {"external_name": ext_name, "product_locator": product_locator} class Settings: """ Encapsulates some settings for use in tests """ def __init__(self, capo_settings, db_settings, test_data): self.capo_settings = capo_settings self.db_settings = db_settings self.test_data = test_data def launch_datafetcher(args: list, df_capo_settings: dict) -> int: """invoke the DF with these args as in df.main(), launch it with df.run(), and return the appropriate return/error code """ if args is None or len(args) == 0: return MISSING_SETTING try: namespace = evaluate_args_and_capo(args, df_capo_settings) fetcher = DataFetcher(namespace, df_capo_settings) return fetcher.run() except SystemExit as exc: if hasattr(exc, "value"): return exc.value.code if hasattr(exc.value, "code") else exc.value if hasattr(exc, "code"): return exc.code raise except (KeyError, NoProfileException) as exc: logging.error(f"{exc}") return MISSING_PROFILE except Exception as exc: pytest.fail(f"{exc}") def evaluate_args_and_capo(args: list, capo_settings: dict): if args is None or len(args) == 0: sys.exit(MISSING_SETTING) profile = get_profile_from_args(args) if profile is None: profile = capo_settings["profile"] if profile is None: sys.exit(MISSING_PROFILE) else: args["profile"] = profile namespace = get_arg_parser().parse_args(args) return namespace def get_profile_from_args(args: list) -> str: for i in range(0, len(args)): if args[i] == "--profile" and i < len(args) - 1: profile = args[i + 1] return profile return "" def confirm_retrieve_mode_copy(servers_report: dict) -> None: for server in servers_report: entry = servers_report[server] assert entry["retrieve_method"].value == RetrievalMode.COPY.value