diff --git a/apps/cli/executables/datafetcher/datafetcher/datafetcher.py b/apps/cli/executables/datafetcher/datafetcher/datafetcher.py index 7cd47587b404eab95c28f7f4601dfa3262b664e9..962e5472421bed92e5fc1c067a34d1d9362e3636 100755 --- a/apps/cli/executables/datafetcher/datafetcher/datafetcher.py +++ b/apps/cli/executables/datafetcher/datafetcher/datafetcher.py @@ -3,23 +3,16 @@ """ Module for the command line interface to data-fetcher. """ import logging -import os -import sys - +from argparse import Namespace from pathlib import Path # pylint: disable=C0103, E0402, E0611, R0902, R0903, W0703, W1203 -from typing import List, Dict -from datafetcher.errors import ( - MissingSettingsException, - NoProfileException, - FileErrorException, -) +from datafetcher.errors import MissingSettingsException, NoProfileException from datafetcher.project_fetcher import ParallelFetcher from .locations_report import LocationsReport -from .utilities import parse_args, get_capo_settings, path_is_accessible +from .utilities import get_arg_parser, get_capo_settings, path_is_accessible _APPLICATION_NAME = "datafetcher" @@ -48,45 +41,40 @@ class DataFetcher: """ - def __init__(self, args_in: List[str], df_capo_settings: Dict): + # TODO Some Fine Day: refactor to reduce cognitive complexity + def __init__(self, args: Namespace, df_capo_settings: dict): self.usage = self._build_usage_message() - if args_in is None or df_capo_settings is None: + if args is None or df_capo_settings is None: raise MissingSettingsException() - args = parse_args(args_in) + self.args = args self.settings = df_capo_settings try: - self.verbose = args.verbose + self.verbose = self.args.verbose except AttributeError: # we don't care; --verbose will be dropped later in WS-179 pass # required arguments - if hasattr(args, "profile"): - self.profile = args.profile - else: - if "CAPO_PROFILE" in os.environ.keys(): - self.profile = os.environ["CAPO_PROFILE"] - else: - raise NoProfileException("Capo profile is required") - + self.profile = args.profile + if self.profile is None: + raise NoProfileException() self.output_dir = args.output_dir if self.output_dir is None: raise MissingSettingsException("output directory option is missing") - - self.output_dir = Path(self.output_dir) - if not self.output_dir.is_dir() or not path_is_accessible(self.output_dir): - raise FileErrorException(f"output location {self.output_dir} inaccessible or not found") + output_dir = Path(self.output_dir) + if not output_dir.is_dir() or not path_is_accessible(output_dir): + raise MissingSettingsException( + f"output location {self.output_dir} inaccessible or not found" + ) if args.location_file is not None: if args.product_locator is not None: raise MissingSettingsException( "required: location file OR product locator -- not both" ) - self.location_file = Path(args.location_file) - self.product_locator = None + self.location_file = args.location_file elif args.product_locator is not None: self.product_locator = args.product_locator - self.location_file = None else: raise MissingSettingsException( "you must specify either a location file or a product locator" @@ -120,16 +108,12 @@ class DataFetcher: :return: """ - fetcher = ParallelFetcher( - self.output_dir, self.is_dry, self.force, self.settings, self.servers_report - ) + fetcher = ParallelFetcher(self.args, self.settings, self.servers_report) fetcher.run() def _get_locations(self): capo_settings = get_capo_settings(self.profile) - if self.product_locator: - return LocationsReport(self.product_locator, capo_settings) - return LocationsReport(self.location_file, capo_settings) + return LocationsReport(self.args, capo_settings) def main(): @@ -139,16 +123,9 @@ def main(): logging.basicConfig(level=logging.DEBUG) - args = sys.argv - profile = None - if "--profile" in args: - for i in range(0, len(args)): - if args[i] == "--profile": - profile = args[i + 1] - break - if not profile: - profile = os.environ["CAPO_PROFILE"] - settings = get_capo_settings(profile) + args = get_arg_parser().parse_args() + + settings = get_capo_settings(args.profile) DataFetcher(args, settings).run() diff --git a/apps/cli/executables/datafetcher/datafetcher/file_retrievers.py b/apps/cli/executables/datafetcher/datafetcher/file_retrievers.py index ae6c50ade5bc046067dafdf88f2ef4bfaf8f6877..a509e19ae8cc4ac70094e62adcd8a1494eb6b9f7 100644 --- a/apps/cli/executables/datafetcher/datafetcher/file_retrievers.py +++ b/apps/cli/executables/datafetcher/datafetcher/file_retrievers.py @@ -33,10 +33,10 @@ class NGASFileRetriever: and saving it to the requested location. """ - def __init__(self, output_dir: Path, dry_run: bool, force: bool): - self.output_dir = output_dir - self.dry_run = dry_run - self.force_overwrite = force + def __init__(self, args: Namespace): + self.output_dir = args.output_dir + self.dry_run = args.dry_run + self.force_overwrite = args.force self.fetch_attempted = False self.num_tries = 0 @@ -52,7 +52,7 @@ class NGASFileRetriever: download_url = "http://" + server + "/RETRIEVE" destination = self._get_destination(file_spec) if destination.exists() and not self.force_overwrite and not self.dry_run: - raise FileErrorException(f"{destination} exists; aborting") + raise FileExistsError(f"{destination} exists; aborting") self._make_basedir(destination) diff --git a/apps/cli/executables/datafetcher/datafetcher/locations_report.py b/apps/cli/executables/datafetcher/datafetcher/locations_report.py index 8f6fc9d546ae3db5f4c07c8918a9bb8a2d91081f..f8e751ee6d1da1b4cd6f993624d9717f10087322 100644 --- a/apps/cli/executables/datafetcher/datafetcher/locations_report.py +++ b/apps/cli/executables/datafetcher/datafetcher/locations_report.py @@ -12,8 +12,8 @@ import copy import http import json import logging -from pathlib import Path -from typing import Dict, Union +from argparse import Namespace +from typing import Dict import requests @@ -25,30 +25,31 @@ from .errors import ( NoLocatorException, MissingSettingsException, ) -from .utilities import Cluster, RetrievalMode, validate_file_spec, parse_args +from .utilities import Cluster, RetrievalMode, validate_file_spec logger = logging.getLogger(__name__) class LocationsReport: - """ - Builds a location report from specified .json locations file, or grabs - the report from archiveService using the product locator. - """ - - def __init__(self, source: Union[str, Path], settings: Dict): - if isinstance(source, str): - self.product_locator = source - self.location_file = None - elif isinstance(source, Path): - self.product_locator = None - self.location_file = source + """ Builds a location report """ - if not self.product_locator and not self.location_file: + def __init__(self, args: Namespace, settings: Dict): + + try: + self.verbose = args.verbose or False + except AttributeError: + # doesn't matter; verbose is going away soon + self.verbose = False + + self._capture_and_validate_input(args, settings) + self._run() + + def _capture_and_validate_input(self, args, settings): + if args is None: raise MissingSettingsException( - "either product locator or report file must be specified" + "arguments (locator and/or report file, destination) are required" ) - + self.args = args if settings is None: raise MissingSettingsException("CAPO settings are required") self.settings = settings @@ -56,7 +57,10 @@ class LocationsReport: if not self.settings["execution_site"]: raise MissingSettingsException("execution_site is required") - self._run() + self.product_locator = args.product_locator + self.location_file = args.location_file + if not self.product_locator and not self.location_file: + raise NoLocatorException("either product locator or report file must be specified") def _run(self): self.files_report = self._get_files_report() @@ -130,9 +134,9 @@ class LocationsReport: :return: location report (from file, in JSON) """ result = dict() - if self.location_file: + if self.location_file is not None: result = self._get_location_report_from_file() - elif self.product_locator is not None: + if self.product_locator is not None: result = self._get_location_report_from_service() return self._add_retrieve_method_field(result) diff --git a/apps/cli/executables/datafetcher/datafetcher/project_fetcher.py b/apps/cli/executables/datafetcher/datafetcher/project_fetcher.py index 497640f066971df349d4ea2fb46232365e83227f..377ca6081d916774220b211b7a34a03812654ed8 100644 --- a/apps/cli/executables/datafetcher/datafetcher/project_fetcher.py +++ b/apps/cli/executables/datafetcher/datafetcher/project_fetcher.py @@ -21,27 +21,21 @@ logger = logging.getLogger(__name__) class BaseFetcher: """ This is a base class for fetchers. """ - def __init__( - self, - output_dir: Path, - dry_run: bool, - force: bool, - df_capo_settings: dict, - servers_report: dict, - ): - self.output_dir = output_dir - self.force_overwrite = force - self.dry_run = dry_run + def __init__(self, args: Namespace, df_capo_settings: dict, servers_report: dict): + self.args = args + self.output_dir = self.args.output_dir + self.force_overwrite = args.force + self.dry_run = args.dry_run self.servers_report = servers_report self.settings = df_capo_settings - self.ngas_retriever = NGASFileRetriever(self.output_dir, self.dry_run, self.force_overwrite) + self.ngas_retriever = NGASFileRetriever(self.args) self.retrieved = [] self.num_files_retrieved = 0 def retrieve_files(self, server, retrieve_method, file_specs): """ This is the part where we actually fetch the files. """ - retriever = NGASFileRetriever(self.output_dir, self.dry_run, self.force_overwrite) + retriever = NGASFileRetriever(self.args) num_files = len(file_specs) count = 0 @@ -64,15 +58,8 @@ class SerialFetcher(BaseFetcher): """ - def __init__( - self, - output_dir: Path, - dry_run: bool, - force: bool, - df_capo_settings: dict, - servers_report: dict, - ): - super().__init__(output_dir, dry_run, force, df_capo_settings, servers_report) + def __init__(self, args: Namespace, df_capo_settings: Dict, servers_report: Dict): + super().__init__(args, df_capo_settings, servers_report) def run(self): """ fetch 'em """ @@ -91,15 +78,8 @@ class SerialFetcher(BaseFetcher): class ParallelFetcher(BaseFetcher): """ Pull the files out in parallel; try to be clever about it. """ - def __init__( - self, - output_dir: Path, - dry_run: bool, - force: bool, - df_capo_settings: dict, - servers_report: dict, - ): - super().__init__(output_dir, dry_run, force, df_capo_settings, servers_report) + def __init__(self, args: Namespace, df_capo_settings: dict, servers_report: dict): + super().__init__(args, df_capo_settings, servers_report) self.num_files_expected = self._count_files_expected() self.bucketized_files = self._bucketize_files() @@ -160,7 +140,7 @@ class ParallelFetcher(BaseFetcher): def run(self): """ Fetch all the files for the product locator """ - if self.dry_run: + if self.args.dry_run: logger.debug("This is a dry run; files will not be fetched") with ThreadPoolExecutor() as executor: @@ -179,7 +159,7 @@ class ParallelFetcher(BaseFetcher): # (This error sometimes gets thrown after all files # actually -have- been retrieved. I blame the NGAS API. - JLG) - output_path = Path(self.output_dir) + output_path = Path(self.args.output_dir) files = [ file for file in output_path.rglob("*") diff --git a/apps/cli/executables/datafetcher/datafetcher/utilities.py b/apps/cli/executables/datafetcher/datafetcher/utilities.py index fce3d1bb3a008faf3332e0ef396d77fd2bc1d63a..aaa8cda1d36f659ce33475eb237ff7fc93d0cca4 100644 --- a/apps/cli/executables/datafetcher/datafetcher/utilities.py +++ b/apps/cli/executables/datafetcher/datafetcher/utilities.py @@ -321,63 +321,3 @@ class RetrievalMode(Enum): STREAM = "stream" COPY = "copy" - - -def parse_args(args: List[str]): - """ - Wrapper around parser.parse_args() so in the event of a foo - an exception is thrown rather than sys.exe - - :param args: - :return: - """ - confirm_complete_args(args) - to_parse = [str(arg) for arg in args] - return get_arg_parser().parse_args(to_parse) - - -def confirm_complete_args(args: List[str]): - """ - Let's scrutinize the args -before- calling parse_args() - so we can differentiate among errors. - - :param args: - :return: - """ - - # we must have a profile - if "--profile" not in args and "CAPO_PROFILE" not in os.environ.keys(): - raise NoProfileException("Capo profile is required.") - - # we must have an output dir.... - if "--output-dir" not in args: - raise MissingSettingsException("output dir is required") - - # ... and it must exist - args_iter = iter(args) - args_dict = dict(zip(args_iter, args_iter)) - - output_dir_arg = args_dict["--output-dir"] - - output_dir = Path(output_dir_arg) - if not output_dir.exists(): - # did they just forget to specify it? - if output_dir_arg.startswith("--"): - raise MissingSettingsException("--output-dir is required") - raise FileErrorException(f"output dir '{output_dir_arg}' not found") - if not path_is_accessible(output_dir): - raise FileNotFoundError(f"permission denied on {output_dir}") - - # we must have EITHER a product locator OR a locations file.... - if "--product-locator" not in args_dict.keys() and "--location-file" not in args_dict.keys(): - raise MissingSettingsException("either product locator or locations file required") - if "--product-locator" in args_dict.keys() and "--location-file" in args_dict.keys(): - raise MissingSettingsException("product locator OR locations file required -- not both") - - if "--product-locator" in args: - product_locator = args_dict["--product-locator"] - assert product_locator - else: - locations_file = args_dict["--location-file"] - if not Path(locations_file).exists(): - raise FileNotFoundError(f"{locations_file} not found") diff --git a/schema/versions/3bed1c92b0b8_removing_verbose_from_download_template.py b/schema/versions/3bed1c92b0b8_removing_verbose_from_download_template.py new file mode 100644 index 0000000000000000000000000000000000000000..d6aebc11432b01d0256fb121f431f10c2e892af2 --- /dev/null +++ b/schema/versions/3bed1c92b0b8_removing_verbose_from_download_template.py @@ -0,0 +1,32 @@ +"""removing verbose from download template + +Revision ID: 3bed1c92b0b8 +Revises: 68d0883785b7 +Create Date: 2021-04-14 15:29:23.217461 + +""" +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision = '3bed1c92b0b8' +down_revision = '68d0883785b7' +branch_labels = None +depends_on = None + + +def upgrade(): + op.execute( + "UPDATE workflow_templates " + "SET content = E'#!/bin/sh\n\ndatafetcher --product-locator $1\ndeliver -r .\n' " + "WHERE filename = 'test_download.sh'" + ) + + +def downgrade(): + op.execute( + "UPDATE workflow_templates " + "SET content = E'#!/bin/sh\n\ndatafetcher --verbose --product-locator $1\ndeliver -r .\n' " + "WHERE filename = 'test_download.sh'" + ) diff --git a/shared/workspaces/test/test_capability_engine.py b/shared/workspaces/test/test_capability_engine.py index fbdee59bc0ee8005212e722522cc5034eb54d0b8..5b87ec43be85380b836e2b1a1d1284236783817a 100644 --- a/shared/workspaces/test/test_capability_engine.py +++ b/shared/workspaces/test/test_capability_engine.py @@ -1,32 +1,38 @@ -# import pytest -# -# from workspaces.capability.schema import CapabilityExecution -# from workspaces.capability.services.capability_engine import CapabilityEngine +import pytest +import warnings +from sqlalchemy import exc as sa_exc -# pytest_plugins = ["testing.utils.conftest"] +from workspaces.capability.schema import CapabilityExecution +from workspaces.capability.services.capability_engine import CapabilityEngine + +pytest_plugins = ["testing.utils.conftest"] """ Tests for CapabilityEngine """ -# @pytest.mark.usefixtures("mock_capability_execution", "mock_capability_engine") -# class TestCapabilityEngine: -# @pytest.mark.skip(reason="Test is hanging; CapabilityEngine needs thread mocking, I think") -# def test_load_engine( -# self, -# mock_capability_engine: CapabilityEngine, -# mock_capability_execution: CapabilityExecution, -# ): -# engine = mock_capability_engine -# engine.load_engine(mock_capability_execution) -# -# assert engine.execution == mock_capability_execution -# -# def test_execute(self): -# # TODO: test that current step is prepare and run workflow -# pass -# -# def test_submit_workflow_request(self): -# # TODO: test that workflow id is in updated execution object -# pass +@pytest.mark.usefixtures("mock_capability_execution", "mock_capability_engine") +class TestCapabilityEngine: + + def test_load_engine( + self, + mock_capability_engine: CapabilityEngine, + mock_capability_execution: CapabilityExecution, + ): + with warnings.catch_warnings(): + # suppress SQLAlchemy warnings + warnings.simplefilter("ignore", category=sa_exc.SAWarning) + + engine = mock_capability_engine + engine.load_engine(mock_capability_execution) + + assert engine.execution == mock_capability_execution + + def test_execute(self): + # TODO: test that current step is prepare and run workflow + pass + + def test_submit_workflow_request(self): + # TODO: test that workflow id is in updated execution object + pass