Skip to content
Snippets Groups Projects
Commit 70f47cf8 authored by Charlotte Hausman's avatar Charlotte Hausman Committed by Charlotte Hausman
Browse files

fixing download capability

parent c65e4029
No related branches found
No related tags found
1 merge request!188fixing download capability
Pipeline #1305 failed
......@@ -3,23 +3,16 @@
""" Module for the command line interface to data-fetcher. """
import logging
import os
import sys
from argparse import Namespace
from pathlib import Path
# pylint: disable=C0103, E0402, E0611, R0902, R0903, W0703, W1203
from typing import List, Dict
from datafetcher.errors import (
from datafetcher.errors import MissingSettingsException, NoProfileException
from datafetcher.project_fetcher import ParallelFetcher
from .locations_report import LocationsReport
from .utilities import parse_args, get_capo_settings, path_is_accessible
from .utilities import get_arg_parser, get_capo_settings, path_is_accessible
_APPLICATION_NAME = "datafetcher"
......@@ -48,45 +41,40 @@ class DataFetcher:
def __init__(self, args_in: List[str], df_capo_settings: Dict):
# TODO Some Fine Day: refactor to reduce cognitive complexity
def __init__(self, args: Namespace, df_capo_settings: dict):
self.usage = self._build_usage_message()
if args_in is None or df_capo_settings is None:
if args is None or df_capo_settings is None:
raise MissingSettingsException()
args = parse_args(args_in)
self.args = args
self.settings = df_capo_settings
self.verbose = args.verbose
self.verbose = self.args.verbose
except AttributeError:
# we don't care; --verbose will be dropped later in WS-179
# required arguments
if hasattr(args, "profile"):
self.profile = args.profile
if "CAPO_PROFILE" in os.environ.keys():
self.profile = os.environ["CAPO_PROFILE"]
raise NoProfileException("Capo profile is required")
self.profile = args.profile
if self.profile is None:
raise NoProfileException()
self.output_dir = args.output_dir
if self.output_dir is None:
raise MissingSettingsException("output directory option is missing")
self.output_dir = Path(self.output_dir)
if not self.output_dir.is_dir() or not path_is_accessible(self.output_dir):
raise FileErrorException(f"output location {self.output_dir} inaccessible or not found")
output_dir = Path(self.output_dir)
if not output_dir.is_dir() or not path_is_accessible(output_dir):
raise MissingSettingsException(
f"output location {self.output_dir} inaccessible or not found"
if args.location_file is not None:
if args.product_locator is not None:
raise MissingSettingsException(
"required: location file OR product locator -- not both"
self.location_file = Path(args.location_file)
self.product_locator = None
self.location_file = args.location_file
elif args.product_locator is not None:
self.product_locator = args.product_locator
self.location_file = None
raise MissingSettingsException(
"you must specify either a location file or a product locator"
......@@ -120,16 +108,12 @@ class DataFetcher:
fetcher = ParallelFetcher(
self.output_dir, self.is_dry, self.force, self.settings, self.servers_report
fetcher = ParallelFetcher(self.args, self.settings, self.servers_report)
def _get_locations(self):
capo_settings = get_capo_settings(self.profile)
if self.product_locator:
return LocationsReport(self.product_locator, capo_settings)
return LocationsReport(self.location_file, capo_settings)
return LocationsReport(self.args, capo_settings)
def main():
......@@ -139,16 +123,9 @@ def main():
args = sys.argv
profile = None
if "--profile" in args:
for i in range(0, len(args)):
if args[i] == "--profile":
profile = args[i + 1]
if not profile:
profile = os.environ["CAPO_PROFILE"]
settings = get_capo_settings(profile)
args = get_arg_parser().parse_args()
settings = get_capo_settings(args.profile)
DataFetcher(args, settings).run()
......@@ -33,10 +33,10 @@ class NGASFileRetriever:
and saving it to the requested location.
def __init__(self, output_dir: Path, dry_run: bool, force: bool):
self.output_dir = output_dir
self.dry_run = dry_run
self.force_overwrite = force
def __init__(self, args: Namespace):
self.output_dir = args.output_dir
self.dry_run = args.dry_run
self.force_overwrite = args.force
self.fetch_attempted = False
self.num_tries = 0
......@@ -52,7 +52,7 @@ class NGASFileRetriever:
download_url = "http://" + server + "/RETRIEVE"
destination = self._get_destination(file_spec)
if destination.exists() and not self.force_overwrite and not self.dry_run:
raise FileErrorException(f"{destination} exists; aborting")
raise FileExistsError(f"{destination} exists; aborting")
......@@ -12,8 +12,8 @@ import copy
import http
import json
import logging
from pathlib import Path
from typing import Dict, Union
from argparse import Namespace
from typing import Dict
import requests
......@@ -25,30 +25,31 @@ from .errors import (
from .utilities import Cluster, RetrievalMode, validate_file_spec, parse_args
from .utilities import Cluster, RetrievalMode, validate_file_spec
logger = logging.getLogger(__name__)
class LocationsReport:
Builds a location report from specified .json locations file, or grabs
the report from archiveService using the product locator.
def __init__(self, source: Union[str, Path], settings: Dict):
if isinstance(source, str):
self.product_locator = source
self.location_file = None
elif isinstance(source, Path):
self.product_locator = None
self.location_file = source
""" Builds a location report """
if not self.product_locator and not self.location_file:
def __init__(self, args: Namespace, settings: Dict):
self.verbose = args.verbose or False
except AttributeError:
# doesn't matter; verbose is going away soon
self.verbose = False
self._capture_and_validate_input(args, settings)
def _capture_and_validate_input(self, args, settings):
if args is None:
raise MissingSettingsException(
"either product locator or report file must be specified"
"arguments (locator and/or report file, destination) are required"
self.args = args
if settings is None:
raise MissingSettingsException("CAPO settings are required")
self.settings = settings
......@@ -56,7 +57,10 @@ class LocationsReport:
if not self.settings["execution_site"]:
raise MissingSettingsException("execution_site is required")
self.product_locator = args.product_locator
self.location_file = args.location_file
if not self.product_locator and not self.location_file:
raise NoLocatorException("either product locator or report file must be specified")
def _run(self):
self.files_report = self._get_files_report()
......@@ -130,9 +134,9 @@ class LocationsReport:
:return: location report (from file, in JSON)
result = dict()
if self.location_file:
if self.location_file is not None:
result = self._get_location_report_from_file()
elif self.product_locator is not None:
if self.product_locator is not None:
result = self._get_location_report_from_service()
return self._add_retrieve_method_field(result)
......@@ -21,27 +21,21 @@ logger = logging.getLogger(__name__)
class BaseFetcher:
""" This is a base class for fetchers. """
def __init__(
output_dir: Path,
dry_run: bool,
force: bool,
df_capo_settings: dict,
servers_report: dict,
self.output_dir = output_dir
self.force_overwrite = force
self.dry_run = dry_run
def __init__(self, args: Namespace, df_capo_settings: dict, servers_report: dict):
self.args = args
self.output_dir = self.args.output_dir
self.force_overwrite = args.force
self.dry_run = args.dry_run
self.servers_report = servers_report
self.settings = df_capo_settings
self.ngas_retriever = NGASFileRetriever(self.output_dir, self.dry_run, self.force_overwrite)
self.ngas_retriever = NGASFileRetriever(self.args)
self.retrieved = []
self.num_files_retrieved = 0
def retrieve_files(self, server, retrieve_method, file_specs):
""" This is the part where we actually fetch the files. """
retriever = NGASFileRetriever(self.output_dir, self.dry_run, self.force_overwrite)
retriever = NGASFileRetriever(self.args)
num_files = len(file_specs)
count = 0
......@@ -64,15 +58,8 @@ class SerialFetcher(BaseFetcher):
def __init__(
output_dir: Path,
dry_run: bool,
force: bool,
df_capo_settings: dict,
servers_report: dict,
super().__init__(output_dir, dry_run, force, df_capo_settings, servers_report)
def __init__(self, args: Namespace, df_capo_settings: Dict, servers_report: Dict):
super().__init__(args, df_capo_settings, servers_report)
def run(self):
""" fetch 'em """
......@@ -91,15 +78,8 @@ class SerialFetcher(BaseFetcher):
class ParallelFetcher(BaseFetcher):
""" Pull the files out in parallel; try to be clever about it. """
def __init__(
output_dir: Path,
dry_run: bool,
force: bool,
df_capo_settings: dict,
servers_report: dict,
super().__init__(output_dir, dry_run, force, df_capo_settings, servers_report)
def __init__(self, args: Namespace, df_capo_settings: dict, servers_report: dict):
super().__init__(args, df_capo_settings, servers_report)
self.num_files_expected = self._count_files_expected()
self.bucketized_files = self._bucketize_files()
......@@ -160,7 +140,7 @@ class ParallelFetcher(BaseFetcher):
def run(self):
""" Fetch all the files for the product locator """
if self.dry_run:
if self.args.dry_run:
logger.debug("This is a dry run; files will not be fetched")
with ThreadPoolExecutor() as executor:
......@@ -179,7 +159,7 @@ class ParallelFetcher(BaseFetcher):
# (This error sometimes gets thrown after all files
# actually -have- been retrieved. I blame the NGAS API. - JLG)
output_path = Path(self.output_dir)
output_path = Path(self.args.output_dir)
files = [
for file in output_path.rglob("*")
......@@ -321,63 +321,3 @@ class RetrievalMode(Enum):
STREAM = "stream"
COPY = "copy"
def parse_args(args: List[str]):
Wrapper around parser.parse_args() so in the event of a foo
an exception is thrown rather than sys.exe
:param args:
to_parse = [str(arg) for arg in args]
return get_arg_parser().parse_args(to_parse)
def confirm_complete_args(args: List[str]):
Let's scrutinize the args -before- calling parse_args()
so we can differentiate among errors.
:param args:
# we must have a profile
if "--profile" not in args and "CAPO_PROFILE" not in os.environ.keys():
raise NoProfileException("Capo profile is required.")
# we must have an output dir....
if "--output-dir" not in args:
raise MissingSettingsException("output dir is required")
# ... and it must exist
args_iter = iter(args)
args_dict = dict(zip(args_iter, args_iter))
output_dir_arg = args_dict["--output-dir"]
output_dir = Path(output_dir_arg)
if not output_dir.exists():
# did they just forget to specify it?
if output_dir_arg.startswith("--"):
raise MissingSettingsException("--output-dir is required")
raise FileErrorException(f"output dir '{output_dir_arg}' not found")
if not path_is_accessible(output_dir):
raise FileNotFoundError(f"permission denied on {output_dir}")
# we must have EITHER a product locator OR a locations file....
if "--product-locator" not in args_dict.keys() and "--location-file" not in args_dict.keys():
raise MissingSettingsException("either product locator or locations file required")
if "--product-locator" in args_dict.keys() and "--location-file" in args_dict.keys():
raise MissingSettingsException("product locator OR locations file required -- not both")
if "--product-locator" in args:
product_locator = args_dict["--product-locator"]
assert product_locator
locations_file = args_dict["--location-file"]
if not Path(locations_file).exists():
raise FileNotFoundError(f"{locations_file} not found")
"""removing verbose from download template
Revision ID: 3bed1c92b0b8
Revises: 68d0883785b7
Create Date: 2021-04-14 15:29:23.217461
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '3bed1c92b0b8'
down_revision = '68d0883785b7'
branch_labels = None
depends_on = None
def upgrade():
"UPDATE workflow_templates "
"SET content = E'#!/bin/sh\n\ndatafetcher --product-locator $1\ndeliver -r .\n' "
"WHERE filename = ''"
def downgrade():
"UPDATE workflow_templates "
"SET content = E'#!/bin/sh\n\ndatafetcher --verbose --product-locator $1\ndeliver -r .\n' "
"WHERE filename = ''"
# import pytest
# from workspaces.capability.schema import CapabilityExecution
# from import CapabilityEngine
import pytest
import warnings
from sqlalchemy import exc as sa_exc
# pytest_plugins = ["testing.utils.conftest"]
from workspaces.capability.schema import CapabilityExecution
from import CapabilityEngine
pytest_plugins = ["testing.utils.conftest"]
Tests for CapabilityEngine
# @pytest.mark.usefixtures("mock_capability_execution", "mock_capability_engine")
# class TestCapabilityEngine:
# @pytest.mark.skip(reason="Test is hanging; CapabilityEngine needs thread mocking, I think")
# def test_load_engine(
# self,
# mock_capability_engine: CapabilityEngine,
# mock_capability_execution: CapabilityExecution,
# ):
# engine = mock_capability_engine
# engine.load_engine(mock_capability_execution)
# assert engine.execution == mock_capability_execution
# def test_execute(self):
# # TODO: test that current step is prepare and run workflow
# pass
# def test_submit_workflow_request(self):
# # TODO: test that workflow id is in updated execution object
# pass
@pytest.mark.usefixtures("mock_capability_execution", "mock_capability_engine")
class TestCapabilityEngine:
def test_load_engine(
mock_capability_engine: CapabilityEngine,
mock_capability_execution: CapabilityExecution,
with warnings.catch_warnings():
# suppress SQLAlchemy warnings
warnings.simplefilter("ignore", category=sa_exc.SAWarning)
engine = mock_capability_engine
assert engine.execution == mock_capability_execution
def test_execute(self):
# TODO: test that current step is prepare and run workflow
def test_submit_workflow_request(self):
# TODO: test that workflow id is in updated execution object
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment