Skip to content
Snippets Groups Projects
Commit 35da8c8a authored by Daniel Lyons's avatar Daniel Lyons Committed by Daniel Lyons
Browse files

Support fetching multiple locators or files

parent f982a857
No related branches found
No related tags found
1 merge request!346Support fetching multiple locators or files
Pipeline #2198 passed
This commit is part of merge request !346. Comments created here will be created in the context of that merge request.
......@@ -165,9 +165,21 @@ class NgasFile(LocatedFile):
def __str__(self):
return f"NGAS {self.ngas_file_id} from {self.server} -> {self.subdirectory}/{self.relative_path}"
def __eq__(self, other):
return (
self.ngas_file_id == other.ngas_file_id
and self.subdirectory == other.subdirectory
and self.relative_path == other.relative_path
and self.checksum == other.checksum
and self.checksum_type == other.checksum_type
and self.version == other.version
and self.size == other.size
and self.server == other.server
)
class OracleXml(LocatedFile):
""" Represents the metadata of an ALMA SDM stored as XML in the ALMA DB """
"""Represents the metadata of an ALMA SDM stored as XML in the ALMA DB"""
archive_uid: str
table: str
......@@ -195,7 +207,7 @@ class OracleXml(LocatedFile):
class FileLocator(Locator):
""" Loads a locations report from a .json report file """
"""Loads a locations report from a .json report file"""
def __init__(self, file: Path):
self.file = file
......@@ -205,7 +217,7 @@ class FileLocator(Locator):
class ServiceLocator(Locator):
""" Acquires a locations report from the archiveService """
"""Acquires a locations report from the archiveService"""
def __init__(self, science_product_locator: str):
self.spl = science_product_locator
......@@ -216,9 +228,7 @@ class ServiceLocator(Locator):
# this is needed to prevent SSL errors when tests are run
# inside a Docker container
requests.packages.urllib3.util.ssl_.DEFAULT_CIPHERS += ":HIGH:!DH:!aNULL"
requests.Session().mount(
settings.locatorServiceUrlPrefix, adapter=requests.adapters.HTTPAdapter()
)
requests.Session().mount(settings.locatorServiceUrlPrefix, adapter=requests.adapters.HTTPAdapter())
try:
response = requests.get(settings.locatorServiceUrlPrefix, params={"locator": self.spl})
......@@ -237,7 +247,7 @@ class ServiceLocator(Locator):
class NgasServerSchema(Schema):
""" marshmallow schema to interpret "server" section of a "files" item """
"""marshmallow schema to interpret "server" section of a "files" item"""
server = fields.Str()
location = fields.Str()
......@@ -249,7 +259,7 @@ class NgasServerSchema(Schema):
class NgasFileSchema(Schema):
""" One of the items in a location report's "files" list """
"""One of the items in a location report's "files" list"""
ngas_file_id = fields.Str()
subdirectory = fields.Str(allow_none=True)
......@@ -276,7 +286,7 @@ class NgasFileSchema(Schema):
class LocationReportSchema(Schema):
""" Encapsulates an entire locations report """
"""Encapsulates an entire locations report"""
files = fields.List(fields.Nested(NgasFileSchema()))
aggregate_size = fields.Integer()
......
""" Command-line parsing conveniences """
import argparse
import itertools
import sys
from enum import Enum
from pathlib import Path
from typing import List
# pylint: disable=E0401, E0402, R0913
from .exceptions import *
from .exceptions import FetchError
from .fetcher_factory import ConfiguredFetcherFactory
from .fetchers import ForceMode
from .interfaces import FetchPlan, FetcherFactory, FileFetcher
......@@ -23,7 +24,7 @@ class RetrievalMode(Enum):
class CLIParam(Enum):
""" Codifies productfetcher's various command-line parameters """
"""Codifies productfetcher's various command-line parameters"""
SPL = "--product-locator"
FILE = "--location-file"
......@@ -41,18 +42,18 @@ providing the path to a product locator report."""
class FetchContext:
""" Handles the various command-line options """
"""Handles the various command-line options"""
def __init__(
self,
locator: Locator,
locators: List[Locator],
streaming: bool,
direct_copy: bool,
dry_run=False,
force=False,
concurrency=1,
):
self.locator = locator
self.locators = locators
self.streaming = streaming
self.direct_copy = direct_copy
self.dry_run = dry_run
......@@ -66,7 +67,7 @@ class FetchContext:
:return: a plan
"""
# Get the location report
report = self.locator.locate()
reports = [locator.locate() for locator in self.locators]
# Using the arguments we've parsed, generate the fetcher factory
factory = ConfiguredFetcherFactory(
......@@ -75,11 +76,12 @@ class FetchContext:
direct_copy=self.direct_copy,
dry_run=self.dry_run,
force=self.force,
aggregate_size=report.aggregate_size,
aggregate_size=sum([report.aggregate_size for report in reports]),
)
# First we must prepare the fetchers for the location report
all_fetchers = report.fetchers(factory)
# this itertools.chain is a recipe from the itertools documentation for flattening a mapping
all_fetchers = list(itertools.chain.from_iterable(report.fetchers(factory) for report in reports))
# now we have enough stuff to proceed with generating the plan
return self.calculate_plan(factory, all_fetchers)
......@@ -129,17 +131,22 @@ class FetchContext:
:param args:
:return:
"""
namespace = FetchContext.arg_parser().parse_args(args)
parser = FetchContext.arg_parser()
namespace = parser.parse_args(args)
# determine the locator
locator = (
FileLocator(Path(namespace.location_file))
if namespace.location_file
else ServiceLocator(namespace.product_locator)
)
# determine the locators
location_files: List[Locator] = [FileLocator(Path(file)) for file in namespace.location_files]
service_locators: List[Locator] = [ServiceLocator(spl) for spl in namespace.product_locators]
locators = location_files + service_locators
# we can no longer rely on the argument parser to enforce this constraint, so we have to do it
# ourselves here
if len(locators) == 0:
parser.error("You must supply at least one science product locator or location file")
return FetchContext(
locator,
locators,
namespace.streaming,
namespace.direct_copy,
namespace.dry_run,
......@@ -181,19 +188,15 @@ class FetchContext:
default=16,
)
exclusive_group = parser.add_mutually_exclusive_group(required=True)
exclusive_group.add_argument(
CLIParam.SPL.value,
action="store",
dest="product_locator",
help="product locator to download",
parser.add_argument(
CLIParam.SPL.value, action="append", dest="product_locators", help="product locator to download", default=[]
)
exclusive_group.add_argument(
parser.add_argument(
CLIParam.FILE.value,
action="store",
dest="location_file",
action="append",
dest="location_files",
help="product locator report (in JSON)",
default=[],
)
dry_or_force_options = parser.add_mutually_exclusive_group(required=False)
......
......@@ -39,9 +39,7 @@ class FakeFactory(FetcherFactory):
def fetch_oracle_xml(self, file: LocatedFile) -> FileFetcher:
return DryRunFakeFileFetcher(file)
def fetch_plan(
self, fetchers: List[Union[FileFetcher, FetchPlan]], concurrency: int = 1
) -> FetchPlan:
def fetch_plan(self, fetchers: List[Union[FileFetcher, FetchPlan]], concurrency: int = 1) -> FetchPlan:
return FakePlan(fetchers, concurrency)
......@@ -66,6 +64,36 @@ def test_plan_generation(resource_path_root):
assert sum(len(subplan.fetchers) for subplan in plan.fetchers) == len(report.files)
def test_multiple_locator_fetching(capsys, resource_path_root):
"""
Can we handle multiple locators?
:param capsys:
:return:
"""
img = resource_path_root / "location_files" / "IMG.json"
cal = resource_path_root / "location_files" / "CALIBRATION.json"
# parse the command line with these two
fc = FetchContext.parse_commandline([CLIParam.FILE.value, str(img), CLIParam.FILE.value, str(cal)])
assert len(fc.locators) == 2
# let's make the plan and ensure we have all the stuff we expect from both
plan = fc.generate_plan()
# we'll need to open these files ourselves to figure out what fetchers we expect
for locator_file in [img, cal]:
for file in FileLocator(locator_file).locate().files:
seen = False
# there may be a more "test friendly" way of doing this, such as by asking the plan
# if it is fetching a certain file, but it seems like a lot of refactoring for this
# one test, so I'm going to leave it alone for now
for hostgroup_fetcher in plan.fetchers:
seen = seen or file in [fetcher.file for fetcher in hostgroup_fetcher.fetchers]
assert seen
def test_argument_parsing(capsys):
"""
Can we parse the command-line arguments passed in?
......@@ -78,12 +106,6 @@ def test_argument_parsing(capsys):
# must have an SPL or a file
FetchContext.parse_commandline([])
with pytest.raises(SystemExit):
# cannot have an SPL and a file
FetchContext.parse_commandline(
[CLIParam.FILE.value, "foo", CLIParam.SPL.value, "uid://this/is/a/fakesy"]
)
# check the dry run value
fc = FetchContext.parse_commandline([CLIParam.DRY.value, CLIParam.FILE.value, "foo"])
assert fc.dry_run
......@@ -97,16 +119,14 @@ def test_argument_parsing(capsys):
assert fc.force == ForceMode.NORMAL
fc = FetchContext.parse_commandline([CLIParam.FILE.value, "foo"])
assert isinstance(fc.locator, FileLocator)
assert fc.locator.file == Path("foo")
assert isinstance(fc.locators[0], FileLocator)
assert fc.locators[0].file == Path("foo")
fc = FetchContext.parse_commandline([CLIParam.SPL.value, "uid://this/is/a/fakesy"])
assert isinstance(fc.locator, ServiceLocator)
assert fc.locator.spl == "uid://this/is/a/fakesy"
assert isinstance(fc.locators[0], ServiceLocator)
assert fc.locators[0].spl == "uid://this/is/a/fakesy"
fc = FetchContext.parse_commandline(
[CLIParam.FILE.value, "foo", CLIParam.CONCURRENCY.value, "732"]
)
fc = FetchContext.parse_commandline([CLIParam.FILE.value, "foo", CLIParam.CONCURRENCY.value, "732"])
assert fc.concurrency == 732
capsys.readouterr()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment