diff --git a/apps/cli/executables/datafetcher/ARCHITECTURE.md b/apps/cli/executables/datafetcher/ARCHITECTURE.md new file mode 100644 index 0000000000000000000000000000000000000000..f6e475bc55bc5731761f910789a669eb25df3904 --- /dev/null +++ b/apps/cli/executables/datafetcher/ARCHITECTURE.md @@ -0,0 +1,138 @@ +# Architecture of the Product Fetcher + +The product fetcher is designed to retrieve data products from our archives. As the name suggests, the input to the +product fetcher is a _science product locator_, a string like +`uid://evla/execblock/27561b56-4c6a-4614-bc26-67e436b5e92c`. The science product locator is decoded by a service called +the locator service, which uses the archive's knowledge of different instruments and their storage locations to produce +something called a location report. The location report contains a list of files that are associated to the science +product, and information about where they can be obtained. The job of the product locator is to interpret the report and +retrieve the files from wherever they may be. + +The goals for the product locator are: + +- Accuracy: retrieving the files correctly, including retrying as necessary and verifying file content +- Speed: retrieving the files as quickly as possible without sacrificing accuracy + +Because the work is mostly I/O bound and accesses many servers, the product fetcher depends on a high degree of +concurrency to achieve speed. + +## Map + +I divide the fetching process into two stages. In the first stage, we're generating a plan; in the second stage, we're +executing the plan. The "meat" of the program and the bulk of the time and effort takes place in the second stage +and is built out of the following pieces: + +### FileFetcher + +The core of the program is what happens inside a FileFetcher. A FileFetcher retrieves a single file. There are several +different ways files can be stored and there is a FileFetcher for each storage medium and access method. At the moment, +this means there are three implementations of FileFetcher: + +- NgasStreamingFileFetcher, which does a web request against an NGAS resource and writes the result to disk +- NgasDirectCopyFileFetcher, which asks NGAS to write a resource to a certain path on disk +- OracleXmlFileFetcher, which queries Oracle for a value in a certain row of a certain table and writes the result to a + certain path on disk + +FileFetchers have a simple API: you provide them with a file and some validations to run, and they fetch the file +and then run the validations. Because the design is fairly simple, we have a few utility FileFetchers: + +- RetryingFileFetcher, which retries another file fetcher a certain number of times, to increase our fault tolerance +- DryRunFakeFileFetcher, which is used in "dry run" mode to simply print what would be fetched, and in the unit tests +- NgasModeDetectingFileFetcher, which is used when the user has no preference for an NGAS access method and just + wants the program to look at the file and the destination and make a decision + +### FileValidator + +After a file is fetched, we can do some analysis on the file to make sure that it was correctly retrieved. Files +that are stored in NGAS have some associated information we can utilize: the size and a CRC32 checksum. Files stored +in an Oracle database are XML and can be checked for well-formedness, as well as their size. These are the three +validators currently supported: + +- ChecksumValidator, which checks the CRC32 checksum value from NGAS +- SizeValidator, which ensures that the files on disk have the size we anticipated +- XmlValidator, which checks that the file is well-formed XML (not valid, which would require a schema) + +## Planning + +The first stage of the program consists of building a fetch plan. A FetchPlan is, in a trivial sense, a list of +FileFetchers that need to be executed. So the first thing we need to do is get a location report and ask it to make +us a list of fetchers. + +### LocationReport + +The location report represents the report we get back from the locator service. It just contains a list of locations, +which may be NGAS files or Oracle XML tables, and the information needed to validate the retrieval and where to +place the files relative to some destination directory. That's it. + +### Locator + +LocationReports come to us from two sources: from an archive REST service ("the locator service"), or from a file on +disk given to us at the start of execution, which is usually for testing but could occur in the wild if we needed to +utilize a fetch plan without access to the locator service. Because we have two ways to obtain LocationReports, we +have an interface for Locators and two implementations for finding them: + +- FileLocator: given a file, it parses the file to obtain the LocationReport +- ServiceLocator: given a science product locator, queries the service to obtain the LocationReport + +### FetchFactory + +Earlier in the design I mentioned that we need to get FileFetchers from the LocationReport. This is actually a +responsibility of the LocationReport itself, because it knows what kinds of entries appear in the report. But, users +can specify constraints on the details of how a file can be fetched, such as requiring NGAS streaming or doing a dry +run. The LocationReport needs to generate FileFetchers, but its choices need to be parameterized somehow by user input. + +The solution is simple: the LocationReport relies on a FetcherFactory. The FetcherFactory provides an API to make +the kinds of fetchers we need to make. The LocationReport asks the FetcherFactory to make fetchers for the different +flavors of file it has. FetcherFactory is implemented just once, with ConfiguredFetcherFactory, which is +parameterized by various settings from the command line arguments. ConfiguredFetcherFactory knows that if the user +asked for a dry run, when the LocationReport wants to create a FileFetcher it instead returns DryRunFakeFileFetcher +instead of (say) NgasStreamingFileFetcher. + +### FetchPlan + +A FetchPlan is really a way of aggregating FileFetchers together. It turns out that the API for a FetchPlan is the +same as the API for a FileFetcher, except we don't have a return value from `fetch`. But, knowing that we +have a list of FileFetchers to execute, we can execute them in parallel. This gives us a performance benefit. + +There's one implementation of FetchPlan in the system, which is ParallelFetchPlan. The ParallelFetchPlan is +parameterized by a concurrency level. Internally, it works by using a concurrent.futures ThreadPoolExecutor to run +tasks in parallel. Each FileFetcher is a task, so if you have (say) 300 FileFetchers and a concurrency level of 16, +you will see 16 of those fetches running at a time until the list is exhausted. + +### FetchContext + +What's left to be done is the handling of optional flags and the generation of the FetchPlan. It turns out to be +unwise to exceed a certain level of parallelism on a single server. In the past, files were grouped by server, but a +simpler mechanism that achieves the same end is used here: each file can be asked whether it shares resources with +another file. This would be true of NGAS fetches if they talk to the same NGAS host, and it's true of all OracleXml +fetches because all of them access the same database connection. The FetchContext is responsible for generating the +fetch plan by taking the file groups established in this way and making a FetchPlan for each group, and then +wrapping each of those FetchPlans in another overall FetchPlan: + + FetchPlan (overall) + ├── FetchPlan (server 1) + │ ├── FetchNgasFile #1 + │ ├── FetchNgasFile #2 + │ ├── FetchNgasFile #3 + │ ├── ... + ├── FetchPlan (server 2) + │ ├── FetchNgasFile #1 + │ ├── FetchNgasFile #2 + │ ├── FetchNgasFile #3 + │ ├── ... + ├── FetchPlan (ALMA SDMs) + │ ├── FetchAlmaSdm #1 + │ ├── FetchAlmaSdm #2 + │ ├── ... + +If parallel fetching is enabled, the overall FetchPlan will launch (in this example) +three threads, each of which will then launch (say) 4 threads to conduct its own +fetching, resulting in 12 total threads. The fetch should be pretty fast. + +FetchContext also parses the command line arguments, which it feeds into the ConfiguredFetchFactory. + +The main body of the program is then simply this: + + context = FetchContext.parse_commandline(args) + plan = context.generate_plan() + plan.fetch() diff --git a/apps/cli/executables/datafetcher/datafetcher/alma_db_utils.py b/apps/cli/executables/datafetcher/datafetcher/alma_db_utils.py index 00d3e093ddb8c3f9f7889494ab6a71c38d9fcb11..2d2832818e4c4bf228ccda865f24c3c156cf7959 100644 --- a/apps/cli/executables/datafetcher/datafetcher/alma_db_utils.py +++ b/apps/cli/executables/datafetcher/datafetcher/alma_db_utils.py @@ -52,7 +52,7 @@ class AlmaDbConnector: :return: """ - capo_config = CapoConfig(profile="local") + capo_config = CapoConfig() user = capo_config[ALMA_CAPO_PREFIX + "Username"] passwd = capo_config[ALMA_CAPO_PREFIX + "Password"] url = capo_config[ALMA_CAPO_PREFIX + "Url"] diff --git a/apps/cli/executables/datafetcher/datafetcher/redesign/__init__.py b/apps/cli/executables/datafetcher/datafetcher/redesign/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/apps/cli/executables/datafetcher/datafetcher/redesign/exceptions.py b/apps/cli/executables/datafetcher/datafetcher/redesign/exceptions.py new file mode 100644 index 0000000000000000000000000000000000000000..79f899e07d6a6899fe324b19b7e329f2768d44ee --- /dev/null +++ b/apps/cli/executables/datafetcher/datafetcher/redesign/exceptions.py @@ -0,0 +1,47 @@ +""" +Our custom errors. + +We create a simple taxonomy of errors: they are all FetchErrors, but some are RetryableFetchErrors. +The distinction matters to the RetryingFileFetcher, which will re-attempt a fetch only +if it is interrupted by a RetryableFetchError. + +""" + + +class FetchError(Exception): + """ + Some sort of problem has occurred, and the program must end. + """ + + +class RetryableFetchError(FetchError): + """ + A problem has occurred, but one that may go away if you try + again in a few seconds. + """ + + +class FileValidationFault(FetchError): + """ + A file has failed to validate for some reason. This is not a retryable error. + """ + + +class UnsatisfiableConstraint(FetchError): + """ + Something the user has asked for is impossible. For instance, insisting + on an NGAS direct copy with a destination that NGAS cannot see. + """ + + +class NormalModeFileExistsError(FetchError): + """ + Shadows the built-in FileExistsError, but intended to be raised by the ForceMode. + Not a retryable error, the file will continue to be in place each time. + """ + + +class NgasServiceErrorException(FetchError): + """ + Problems that occur during fetching from NGAS are covered by this exception. + """ diff --git a/apps/cli/executables/datafetcher/datafetcher/redesign/fetch_plan.py b/apps/cli/executables/datafetcher/datafetcher/redesign/fetch_plan.py new file mode 100644 index 0000000000000000000000000000000000000000..4861f8126ddba17ae8dfc384b83ac64ecb477675 --- /dev/null +++ b/apps/cli/executables/datafetcher/datafetcher/redesign/fetch_plan.py @@ -0,0 +1,59 @@ +""" +Fetch plans exist to manage running each of a group of FileFetchers. At the moment, +we have identified only two kinds of fetch plan: + + - Sequential fetching, where we do one fetcher and then the next until all are complete + - Parallel fetching, where we allocate a thread pool of a certain size and send all of + the fetchers to the pool to be run as threads become free + +The size of the thread pool is the concurrency number. A ParallelFetchPlan with +concurrency = 7 will generate 7 threads and fetch up to 7 files at the same time. +This means if you had (say) N > 7 files to fetch, you would always have 7 of them +active until you reach the last 7, and then they would wind down to 0 as they +complete. + +It turns out that sequential fetching is a degenerate case of parallel fetching, where +the concurrency limit is 1. So there is no need to implement a sequential fetcher, even +though it would be a quite trivial for loop. We can simply create a thread pool with +one worker and put all the tasks in that pool. +""" + +import concurrent.futures +from typing import Union, List + +# pylint: disable=E0401, E0402, E1136, R0903 + +from .exceptions import FetchError +from .interfaces import FileFetcher, FetchPlan + + +class ParallelFetchPlan(FetchPlan): + """ + Fetch a number of sub-fetchers or sub-plans. Executes as many + concurrently as permitted by the concurrency limit. + """ + + def __init__(self, fetchers: List[Union[FileFetcher, FetchPlan]], concurrency: int = 1): + self.fetchers = fetchers + self.concurrency = concurrency + + def fetch(self): + """ + Carry out the fetch plan. + + :return: + """ + # treat the list of fetchers as a "work log" and work down it + with concurrent.futures.ThreadPoolExecutor(max_workers=self.concurrency) as executor: + # submit all the fetchers to the thread pool to be executed + futures = [executor.submit(fetcher.fetch) for fetcher in self.fetchers] + + # now we can wait for them all to complete + for future in concurrent.futures.as_completed(futures): + try: + # under normal operation, we won't care about the result of the + # fetch operation for these futures, only whether or not there + # is an exception + future.result() + except Exception as exc: + raise FetchError("Trouble with one of the fetchers", exc) from exc diff --git a/apps/cli/executables/datafetcher/datafetcher/redesign/fetcher_factory.py b/apps/cli/executables/datafetcher/datafetcher/redesign/fetcher_factory.py new file mode 100644 index 0000000000000000000000000000000000000000..2881842817de5a71700eef2497c97e85899828bf --- /dev/null +++ b/apps/cli/executables/datafetcher/datafetcher/redesign/fetcher_factory.py @@ -0,0 +1,86 @@ +""" Matchmaker, Matchmaker, make me a fetcher """ + +from typing import List, Union + +# pylint: disable=E0401, E0402, E1136, R0913 + +from .fetch_plan import ParallelFetchPlan +from .fetchers import ( + NgasStreamingFileFetcher, + NgasDirectCopyFileFetcher, + NgasModeDetectingFetcher, + RetryableFileFetcher, + OracleXmlFetcher, + DryRunFakeFileFetcher, +) +from .interfaces import FetcherFactory, FileFetcher, FetchPlan +from .locations import NgasFile, OracleXml +from .validators import SizeValidator, Crc32Validator, XmlWellFormedValidator + + +class ConfiguredFetcherFactory(FetcherFactory): + """ + A generic FetcherFactory which is configured by some arguments originating probably + in the CLI arguments. Pass this through the location report to get a fetch plan. + """ + + def __init__( + self, concurrency=1, streaming=False, direct_copy=False, dry_run=False, force=False + ): + # we only have one kind of OracleXmlFetcher + self.xml_fetcher = OracleXmlFetcher + + # in dry run mode, use fake fetchers + if dry_run: + self.file_fetcher = DryRunFakeFileFetcher + self.xml_fetcher = DryRunFakeFileFetcher + elif streaming: + self.file_fetcher = NgasStreamingFileFetcher + elif direct_copy: + self.file_fetcher = NgasDirectCopyFileFetcher + else: + self.file_fetcher = NgasModeDetectingFetcher + + self.concurrency = concurrency + self.force = force + + def fetch_ngas(self, file: NgasFile) -> FileFetcher: + """ + Fetch a file from NGAS, retrying a few times in the event of error + + :param file: the file to fetch + :return: + """ + return ( + RetryableFileFetcher(self.file_fetcher(file, force=self.force)) + .and_validate_with(SizeValidator(file.size)) + .and_validate_with(Crc32Validator(file.checksum)) + ) + + def fetch_oracle_xml(self, file: OracleXml) -> FileFetcher: + """ + Fetch an SDM table from the ALMA DB, retrying a few times in the event of error + + :param file: + :return: + """ + return ( + RetryableFileFetcher(self.xml_fetcher(file, force=self.force)) + .and_validate_with(SizeValidator(file.size)) + .and_validate_with(XmlWellFormedValidator()) + ) + + def fetch_plan( + self, fetchers: List[Union[FileFetcher, FetchPlan]], concurrency: int = None + ) -> FetchPlan: + """ + Decide whether or not to fetch in parallel. + + :param fetchers: fetchers and/or fetch plans + :param concurrency: number of concurrent jobs permitted + :return: + """ + if not concurrency: + concurrency = self.concurrency + # Take whichever is smaller of my permitted concurrency and the requested concurrency + return ParallelFetchPlan(fetchers, concurrency=min(concurrency, self.concurrency)) diff --git a/apps/cli/executables/datafetcher/datafetcher/redesign/fetchers.py b/apps/cli/executables/datafetcher/datafetcher/redesign/fetchers.py new file mode 100644 index 0000000000000000000000000000000000000000..2d5310e93d223138efb92d279bce60a2fe38fa2a --- /dev/null +++ b/apps/cli/executables/datafetcher/datafetcher/redesign/fetchers.py @@ -0,0 +1,288 @@ +""" +This module concerns the Fetcher facilities. There are expected to be about +three of these: + + - NgasDirectCopyFetcher, which uses the NGAS direct copy plugin, + - NgasStreamingFileFetcher, which uses the NGAS streaming mode, and + - AlmaSdmFetcher, which queries the ALMA database to fetch something + +Each of these is fairly stupid and just tries to fetch its one file, one time. +Retrying is not handled here, the RetryableFileFetcher is used for that. +""" +import time +from enum import Enum +from pathlib import Path + +# pylint: disable=E0401, E0402, W0221 + +import cx_Oracle +import sqlalchemy +from pycapo import CapoConfig + +from .exceptions import ( + RetryableFetchError, + UnsatisfiableConstraint, + NormalModeFileExistsError, +) +from .interfaces import FileFetcher, LocatedFile +from .locations import NgasFile, OracleXml + + +class ForceMode(Enum): + """ + ForceMode is either normal or force. In normal mode, we refuse to overwrite an + existing file. In force mode, we are more than willing to destroy your old data. + """ + + NORMAL = False + FORCE = True + + def check_destination(self, path: Path): + if self == self.NORMAL and path.exists(): + raise NormalModeFileExistsError( + f"Cannot fetch to {path} because the file already exists." + ) + + +class NgasDirectCopyFileFetcher(FileFetcher): + """ + Fetch a file using the NGAS direct copy mechanism. + """ + + def __init__(self, file: NgasFile, force=ForceMode.NORMAL): + super().__init__() + self._file = file + self.force = force + + @property + def file(self) -> NgasFile: + return self._file + + def shares_resources_with(self, other: "FileFetcher") -> bool: + return self.file.shares_resources_with(other.file) + + def do_fetch(self) -> Path: + """ + Fetch a file from NGAS via direct copy, or die trying. + + :return: the fetched file + """ + # ensure that we actually can fetch + if not self.file.can_direct_copy(): + raise UnsatisfiableConstraint(f"Direct copy to {self.file.destination} is not possible") + + # check the destination + self.force.check_destination(self.file.destination) + + # create the directory for this file + self.file.prepare_destination() + + # direct copy the file + self.file.direct_copy() + + return self.file.destination + + def __str__(self): + return f"NGAS Direct Copy of {self.file}" + + +class NgasStreamingFileFetcher(FileFetcher): + """ + Fetch a file using NGAS streaming. + """ + + def __init__(self, file: NgasFile, force=ForceMode.NORMAL): + super().__init__() + self._file = file + self.force = force + + @property + def file(self) -> NgasFile: + return self._file + + def shares_resources_with(self, other: "FileFetcher") -> bool: + return self.file.shares_resources_with(other.file) + + def do_fetch(self) -> Path: + """ + Fetch a file from NGAS via streaming, or die trying. + + :return: the fetched file + """ + # check the destination + self.force.check_destination(self.file.destination) + + # create the directory for this file + self.file.prepare_destination() + + # run the streaming fetch + self.file.stream() + + return self.file.destination + + def __str__(self): + return f"NGAS streaming copy of {self.file}" + + +class OracleXmlFetcher(FileFetcher): + """ + Fetch a file by retrieving it from the ALMA database. + """ + + ALMA_CAPO_PREFIX = "almaMetadataDatabase.jdbc" + + engine = None + + def __init__(self, file: OracleXml, force=ForceMode.NORMAL): + super().__init__() + self._file = file + self.force = force + + @property + def file(self) -> OracleXml: + return self._file + + def shares_resources_with(self, other: "FileFetcher") -> bool: + return self.file.shares_resources_with(other.file) + + def do_fetch(self) -> Path: + """ + Retrieve an ALMA SDM table as XML from the ALMA DB. + + :return: the file we got + """ + # check the destination + self.force.check_destination(self.file.destination) + + # create the directory for this file + self.file.prepare_destination() + + engine = self.get_engine() + try: + result = list( + engine.execute( + f"SELECT (xml).getClobVal() FROM ALMA.{self.file.table} WHERE " + f"archive_uid = :archive_uid", + dict(archive_uid=self.file.archive_uid), + ) + )[0][0] + + Path(self.file.destination).write_text(result) + except Exception as ex: + raise RetryableFetchError("Something went wrong trying to fetch") from ex + finally: + engine.dispose() + + return self.file.destination + + @classmethod + def get_engine(cls): + """ + Get an engine against which queries can be run. + + :return: + """ + if cls.engine is None: + capo_config = CapoConfig() + user = capo_config[cls.ALMA_CAPO_PREFIX + "Username"] + passwd = capo_config[cls.ALMA_CAPO_PREFIX + "Password"] + url = capo_config[cls.ALMA_CAPO_PREFIX + "Url"] + + hostport, service_name = url.split("/")[-2:] + host, port = hostport.split(":") + dsn = cx_Oracle.makedsn(host, port, service_name=service_name) + archiveurl = f"oracle://{user}:{passwd}@{dsn}" + + cls.engine = sqlalchemy.create_engine(archiveurl) + return cls.engine + + def __str__(self): + return f"Oracle XML fetch of {self.file}" + + +class FileFetcherDecorator(FileFetcher): + """ + Decorator to allow us to interpose other behavior into a FileFetcher. + """ + + def __init__(self, underlying: FileFetcher): + super().__init__() + self.underlying = underlying + + @property + def file(self) -> LocatedFile: + return self.underlying.file + + def shares_resources_with(self, other: "FileFetcher") -> bool: + return self.underlying.shares_resources_with(other) + + def do_fetch(self) -> Path: + return self.underlying.do_fetch() + + def __str__(self): + return str(self.underlying) + + +class RetryableFileFetcher(FileFetcherDecorator): + """A FileFetcher that retries N times in the event of failure, waiting + longer each time""" + + def __init__(self, underlying: FileFetcher, retries=3): + super().__init__(underlying) + self.retries = retries + + def do_fetch(self, attempt=1) -> Path: + try: + return self.underlying.do_fetch() + except RetryableFetchError as r_err: + if attempt < self.retries: + # sleep for 2, 4, 8 seconds between attempts + time.sleep(2 ** attempt) + print(f"{self.underlying} (attempt #{attempt+1})") + return self.do_fetch(attempt + 1) + raise r_err + + def __str__(self): + return f"{self.underlying} (with up to {self.retries} retries)" + + +class DryRunFakeFileFetcher(FileFetcher): + """ + A fake fetcher we can use to implement dry runs and other tests + """ + + def __init__(self, file: LocatedFile, force=False): + super().__init__() + self._file = file + self.force = force + + @property + def file(self) -> LocatedFile: + return self._file + + def do_fetch(self) -> Path: + """ + In a dry run, we don't actually do any fetching. + """ + + def shares_resources_with(self, other: "FileFetcher") -> bool: + return self.file.shares_resources_with(other.file) + + def __str__(self): + return f"Fake fetch of {self.file}{' (forced)' if self.force else ''}" + + +class NgasModeDetectingFetcher(FileFetcherDecorator): + """ + This fetcher is used when the user has no preference between streaming and direct-copy. + The fetcher itself looks at the file and the destination and makes a determination about + which one to use. + """ + + def __init__(self, file: NgasFile, force=ForceMode.NORMAL): + # If the user has no preference, they would prefer direct copy + # if it is possible because it's faster + if file.can_direct_copy(): + super().__init__(NgasDirectCopyFileFetcher(file, force=force)) + else: + super().__init__(NgasStreamingFileFetcher(file, force=force)) diff --git a/apps/cli/executables/datafetcher/datafetcher/redesign/interfaces.py b/apps/cli/executables/datafetcher/datafetcher/redesign/interfaces.py new file mode 100644 index 0000000000000000000000000000000000000000..c4dbb5267502e7909ac1bfb42e6b86490baaf352 --- /dev/null +++ b/apps/cli/executables/datafetcher/datafetcher/redesign/interfaces.py @@ -0,0 +1,220 @@ +""" +This module defines the interfaces that the rest of the redesign uses. + +1. LocationReport constitutes all the information about the products we will be fetching +2. LocatedFile is an entry in the LocationReport for a specific file +3. FileFetcher is the interface for fetching a single file +4. FileValidator is the interface for validating a file to ensure it was fetched correctly +5. FetcherFactory is a bridge between the LocationReport and a sequence of FileFetchers, + which feeds the next steps +6. FetchPlan aggregates FileFetchers together + +The plan of operation is expressed by these interfaces like so: + +1. A LocationReport is produced +2. The FetcherFactory is given to the LocationReport, to create a series of FileFetchers +3. The FileFetchers are aggregated into a FetchPlan +4. The FetchPlan is executed + +""" + +# pylint: disable=C0116, E0239, E1101, R0903, W0107 + +from pathlib import Path +from abc import ABC, abstractmethod +from typing import List, Union, NamedTuple + +import abc + + +class LocatedFile(abc.ABC): + """ + Located files know some information about themselves, which gives them two important capabilities: + + 1. Knowing how to construct a FileFetcher for them, given a FetcherFactory, and + 2. Knowing whether or not they share resources with some other LocatedFile. + + The exact details of the LocatedFile will vary as the location report changes over time, but + the canonical implementations right now are AlmaSdm and NgasFile. + """ + + @abc.abstractmethod + def make_fetcher(self, factory: "FetcherFactory") -> "FileFetcher": + pass + + @abc.abstractmethod + def shares_resources_with(self, other: "LocatedFile") -> bool: + pass + + @property + def destination(self) -> Path: + """ + Most LocatedFiles have a subdirectory and relative_path and this is how you piece them together + to form the delivery destination. + :return: + """ + return Path(f"{self.subdirectory}/{self.relative_path}") + + def prepare_destination(self): + """ + Ensure that the directory we need exists before we try to write to it. + """ + self.destination.parent.mkdir(parents=True, exist_ok=True) + + +class FileFetcher(ABC): + """ + A FileFetcher fetches a single file from some external system. + FileFetchers can have Validators attached to them, which will + be called in order after the fetch completes to validate that + the fetch was successful. + + The FetchPlan system relies on FileFetchers knowing when they + required shared access to the external resource. FileFetchers + implement a method to indicate whether they share resources, + and if they do, FetchPlan groups them together to limit their + concurrency. FetchPlan assumes that FileFetchers are disjoint + in their use of shared resources; i.e. that + """ + + def __init__(self): + self.validators = [] + + @property + @abstractmethod + def file(self) -> LocatedFile: + """ + Return the located file used by this fetcher + + :return: LocatedFile used by this fetcher + """ + pass + + @abstractmethod + def do_fetch(self) -> Path: + """ + Fetches something. + + :return: result + :raise: RetryableFetchError if something fails that can be retried + :raise: FetchError in other cases when something goes sideways + """ + pass + + @abstractmethod + def shares_resources_with(self, other: "FileFetcher") -> bool: + """ + True if this and the other FileFetcher access the same shared resources + (such as a web host or database). + + FetchPlan assumes that resource sharing is a transitive property, + i.e., that if fetcher A and fetcher B share resources, and so do + fetchers A and C, then fetchers B and C share resources as well. + + :param other: another FileFetcher + :return: true if these two use the same shared resources + """ + pass + + def and_validate_with(self, validator: "FileValidator") -> "FileFetcher": + """ + Attach a validator to this FileFetcher. Can be called multiple times to attach many validators. + + :param validator: validator to attach + :return: self, so that calls can be chained + """ + self.validators.append(validator) + return self + + def fetch(self): + """ + Fetch a file and validate it. + + :return: path to the newly fetched file, if successful + :raise FileValidationFault if the fetch did not succeed + """ + # do the fetch + print(str(self)) + result = self.do_fetch() + + # now validate with our validators + if result is None: + print("No file fetched, skipping validation") + else: + for validator in self.validators: + validator.validate(result) + + # if we made it here, we have a result + return result + + +class FetcherFactory(ABC): + """ + As the name suggests, a FetcherFactory is a device for constructing FileFetchers. + The LocationReport is able to use this to produce a list of FileFetchers of various types, + possibly according to settings provided by the user at the command-line. + """ + + @abstractmethod + def fetch_ngas(self, file: LocatedFile) -> FileFetcher: + """ + Fetch a single file from NGAS + + :param file: the file to fetch + :return: a fetch plan to retrieve that file + """ + pass + + @abstractmethod + def fetch_oracle_xml(self, file: LocatedFile) -> FileFetcher: + """ + Fetch a single ALMA SDM from their Oracle database. + + :param file: SDM as XML from ALMA DB + :return: an aggregating FetchPlan + """ + pass + + @abstractmethod + def fetch_plan( + self, fetchers: List[Union[FileFetcher, "FetchPlan"]], concurrency: int = 1 + ) -> "FetchPlan": + pass + + +class FileValidator(ABC): + """ + A FileValidator examines a file on disk and verifies that it passes muster + in some implementation-dependent manner. + """ + + @abstractmethod + def validate(self, path: Path): + """ + Validates a file on the filesystem. If the file is OK, the validator will simply return; + if the file is not OK an exception will be raised. + + :raise FileValidationFault + """ + pass + + +class LocationReport(NamedTuple): + """ Represents a locations report from archiveService """ + + files: List[LocatedFile] + aggregate_size: int + + def fetchers(self, factory: FetcherFactory) -> List[FileFetcher]: + return [file.make_fetcher(factory) for file in self.files] + + +class FetchPlan(ABC): + """ + A fetch plan is a lot like a FileFetcher, except it potentially does many files + and thus we have no specific return value from the fetch method. + """ + + @abstractmethod + def fetch(self): + pass diff --git a/apps/cli/executables/datafetcher/datafetcher/redesign/locations.py b/apps/cli/executables/datafetcher/datafetcher/redesign/locations.py new file mode 100644 index 0000000000000000000000000000000000000000..04a8e407cd08b0cbfbe600e8ac66a141263f879b --- /dev/null +++ b/apps/cli/executables/datafetcher/datafetcher/redesign/locations.py @@ -0,0 +1,264 @@ +""" LocationsReport conveniences """ + +from abc import abstractmethod, ABC +from enum import Enum +from pathlib import Path +from typing import NamedTuple, Optional + + +# pylint: disable=E0239, E0401, E0402, E1136, R0201, R0902, R0903, R0913, W0613 + +from marshmallow import Schema, fields, post_load + +from .interfaces import LocatedFile, LocationReport +from .ngas import NgasConnection + + +class Location(Enum): + """ + Where the files live + """ + + DSOC = "DSOC" + NAASC = "NAASC" + + @property + def lustre_prefix(self) -> str: + """ + The expected Lustre path prefix for this location + """ + return "/lustre/aoc" if self == self.DSOC else "/lustre/naasc" + + @staticmethod + def in_location(path: Path) -> Optional["Location"]: + """ + Given a path, return the location it appears to be in, or nothing, + if you're out in space somewhere. + + :param path: the path to test + :return: a Location, optionally + """ + + # look through the locations until we find one that seems to match + for location in Location: + if str(path.absolute()).startswith(location.lustre_prefix): + return location + + return None + + +class Cluster(Enum): + """ + Which cluster the files are on + """ + + DSOC = "DSOC" + NAASC = "NAASC" + + +class NgasServer(NamedTuple): + """ + Encapsulates the "server" section of a "files" item in locations report + """ + + server: str + location: Location + cluster: Cluster + + def can_direct_copy_to(self, dest: Path) -> bool: + """ + True if we can direct copy to this destination. + + In principle, direct copy is possible if the file's NGAS server and the destination + are in the same datacenter, but it also depends on the correct version of NGAS and + the NGAS direct copy plugin, which is not supported by the NAASC cluster at the moment. + So there is a test in here which will need to be removed eventually, to ensure that + the DSOC cluster is accounted for. + + :param dest: the destination to write to + :return: true if direct copy is possible + """ + + return self.cluster == Cluster.DSOC and self.location == Location.in_location(dest) + + def connect(self) -> NgasConnection: + """ + Connect to this NGAS server and return an NgasConnection we can use to send requests to. + + :return: a connection to NGAS + """ + return NgasConnection(self.server) + + def __str__(self): + return f"NGAS {self.server} at {self.location}" + + +class NgasFile(LocatedFile): + """Encapsulates all the information about an NGAS file in a locations report""" + + ngas_file_id: str + subdirectory: str + relative_path: str + checksum: int + checksum_type: str + version: int + size: int + server: NgasServer + + def __init__( + self, + ngas_file_id, + subdirectory, + relative_path, + checksum, + checksum_type, + version, + size, + server, + ): + super().__init__() + self.ngas_file_id = ngas_file_id + self.subdirectory = subdirectory + self.relative_path = relative_path + self.checksum = checksum + self.checksum_type = checksum_type + self.version = version + self.size = size + self.server = server + + def make_fetcher(self, factory: "FetcherFactory") -> "FileFetcher": + return factory.fetch_ngas(self) + + def shares_resources_with(self, other: "LocatedFile") -> bool: + return hasattr(other, "server") and self.server == other.server + + def can_direct_copy(self): + """ + True if this file can be direct copied from NGAS. + """ + return self.server.can_direct_copy_to(self.destination) + + def direct_copy(self): + """ + Direct copy this file to its destination + + :return: None, if successful + """ + with self.server.connect() as connection: + connection.direct_copy(self.ngas_file_id, self.version, self.destination) + + def stream(self): + """ + Stream this file into its destination. + + :return: None, if successful + """ + with self.server.connect() as connection: + connection.stream(self.ngas_file_id, self.version, self.destination) + + def __str__(self): + return f"NGAS {self.ngas_file_id} from {self.server} -> {self.subdirectory}/{self.relative_path}" + + +class OracleXml(LocatedFile): + """ Represents the metadata of an ALMA SDM stored as XML in the ALMA DB """ + + archive_uid: str + table: str + subdirectory: Path + relative_path: str + size: int + + def __init__(self, archive_uid, table, subdirectory, relative_path, size): + super().__init__() + self.archive_uid = archive_uid + self.table = table + self.subdirectory = subdirectory + self.relative_path = relative_path + self.size = size + + def make_fetcher(self, factory: "FetcherFactory") -> "FileFetcher": + return factory.fetch_oracle_xml(self) + + def shares_resources_with(self, other: "LocatedFile") -> bool: + # all AlmaSdms use the same resource + return hasattr(other, "table") + + def __str__(self): + return f"ALMA {self.archive_uid} @ {self.table} -> {self.subdirectory}/{self.relative_path}" + + +class Locator(ABC): + """ Obtains a location report in various ways """ + + @abstractmethod + def locate(self) -> LocationReport: + """ + Obtain the LocationReport we need to proceed. + + :return: a LocationReport for where all the files we need are + """ + + +class FileLocator(Locator): + """ Loads a locations report from a .json report file """ + + def __init__(self, file: Path): + self.file = file + + def locate(self) -> LocationReport: + return LocationReportSchema().loads(self.file.read_text()) + + +class ServiceLocator(Locator): + """ Acquires a locations report from the archiveService """ + + def __init__(self, science_product_locator: str): + self.spl = science_product_locator + + def locate(self) -> LocationReport: + raise NotImplementedError + + +class NgasServerSchema(Schema): + """ marshmallow schema to interpret "server" section of a "files" item """ + + server = fields.Str() + location = fields.Str() + cluster = fields.Str() + + @post_load + def make_ngas_server(self, data, **kwargs): + return NgasServer(data["server"], Location[data["location"]], Cluster[data["cluster"]]) + + +class NgasFileSchema(Schema): + """ One of the items in a location report's "files" list """ + + ngas_file_id = fields.Str() + subdirectory = fields.Str() + relative_path = fields.Str() + checksum = fields.Integer() + checksum_type = fields.Str() + version = fields.Integer() + size = fields.Integer() + server = fields.Nested(NgasServerSchema()) + table = fields.Str() + archive_uid = fields.Str() + + @post_load + def make_filespec(self, data, **kwargs): + if "table" in data: + return OracleXml(**data) + return NgasFile(**data) + + +class LocationReportSchema(Schema): + """ Encapsulates an entire locations report """ + + files = fields.List(fields.Nested(NgasFileSchema())) + aggregate_size = fields.Integer() + + @post_load + def make_report(self, data, **kwargs): + return LocationReport(**data) diff --git a/apps/cli/executables/datafetcher/datafetcher/redesign/ngas.py b/apps/cli/executables/datafetcher/datafetcher/redesign/ngas.py new file mode 100644 index 0000000000000000000000000000000000000000..36a611c8892f9e04323f3bed4f84920187628adb --- /dev/null +++ b/apps/cli/executables/datafetcher/datafetcher/redesign/ngas.py @@ -0,0 +1,116 @@ +""" +Abstracts the work of talking to NGAS. Clients of NGAS should not have to know +a lot about the way NGAS requests work, especially since there are just a couple +things you can do with a request (stream a file to disk or direct copy a file +to disk). + +NgasConnection manages that for you; all you need to bring to the table is the +file ID, the file version, and where you want the file written to disk. + +In principle it should be less expensive to call these multiple times reusing +the same connection, but in practice I doubt the savings is meaningful, and the +current product-fetcher design does not reuse the connection anyway. +""" + +# pylint: disable=E0401, E0402 + +import http +from contextlib import AbstractContextManager +from pathlib import Path +from xml.etree import ElementTree + +import requests + +from .exceptions import RetryableFetchError, FetchError, NgasServiceErrorException + +SERVICE_EXC_TEXT = "Service exception" + + +class NgasConnection(AbstractContextManager): + """ + This class makes it easier to deal with NGAS connections. + """ + + def __init__(self, server): + self.session = requests.Session() + self.download_url = f"http://{server}/RETRIEVE" + + def stream(self, file_id: str, file_version: int, destination: Path): + """ + Stream the request to disk. Throws an exception if it fails. + + :param file_id: the file to stream + :param file_version: the version of the file to request + :param destination: where to place the result + :return: None + """ + # this is the default behavior of NGAS, so we just make a web request and + # it should be fine + params = { + "file_id": file_id, + "file_version": file_version, + } + + response = self.get(params=params, stream=True) + with open(destination, "wb") as file_to_write: + for chunk in response.iter_content(chunk_size=8192): + file_to_write.write(chunk) + + def direct_copy(self, file_id, file_version, destination): + """ + Direct copy a file to a destination. Throws an exception if it fails. + + :param file_id: the file to copy + :param file_version: the version of the file to request + :param destination: where to place the result + :return: + """ + + # the trick here is to just specify the output file when providing the direct copy + # plugin + params = { + "file_id": file_id, + "file_version": file_version, + "processing": "ngamsDirectCopyDppi", + "processingPars": "outfile=" + str(destination.absolute()), + } + + self.get(params=params) + + def get(self, **kwargs) -> requests.Response: + """ + Perform an NGAS request. The keyword arguments flow through to the + underlying requests.get(). + + :param kwargs: additional arguments to the session.get() + :return: a response object + """ + # run the request + response = self.session.get(self.download_url, **kwargs) + + # check the response and make sure it's 200 OK + # if it is not, we parse out the NGAMS Status Message, which + # feeds an exception we throw about how we couldn't do this request + if response.status_code != http.HTTPStatus.OK: + doc = ElementTree.fromstring(response.text) + ngams_status = doc.find("Status").attrib["Message"] + + ex = NgasServiceErrorException( + { + "status_code": response.status_code, + "url": response.url, + "reason": response.reason, + "message": ngams_status, + } + ) + + # No point in retrying a 404 Not Found + if response.status_code == http.HTTPStatus.NOT_FOUND: + raise FetchError(SERVICE_EXC_TEXT, ex) + + # otherwise, maybe, yeah + raise RetryableFetchError(SERVICE_EXC_TEXT, ex) + return response + + def __exit__(self, exc_type, exc_val, exc_tb): + self.session.close() diff --git a/apps/cli/executables/datafetcher/datafetcher/redesign/product_fetcher.py b/apps/cli/executables/datafetcher/datafetcher/redesign/product_fetcher.py new file mode 100644 index 0000000000000000000000000000000000000000..b2ab547e7f4de1449eaaac911410ccdaf126de00 --- /dev/null +++ b/apps/cli/executables/datafetcher/datafetcher/redesign/product_fetcher.py @@ -0,0 +1,238 @@ +""" Command-line parsing conveniences """ +import argparse +from enum import Enum +from pathlib import Path +from typing import List + +# pylint: disable=E0401, E0402, R0913 + +from .fetcher_factory import ConfiguredFetcherFactory +from .fetchers import ForceMode +from .interfaces import FetchPlan, FetcherFactory, FileFetcher +from .locations import FileLocator, ServiceLocator, Locator + + +class RetrievalMode(Enum): + """ + How we're retrieving a file: via streaming or via direct copy (plugin) + """ + + STREAM = "stream" + COPY = "copy" + + +class CLIParam(Enum): + """ Codifies productfetcher's various command-line parameters """ + + SPL = "--product-locator" + FILE = "--location-file" + DRY = "--dry-run" + FORCE = "--force" + DIRECT_COPY = "--direct-copy" + STREAMING = "--streaming" + CONCURRENCY = "--concurrency" + + +# Prologue and epilogue for the command line parser. +_PROLOGUE = """Retrieve a product (a science product or an ancillary product) +from the NRAO archive, either by specifying the product's locator or by +providing the path to a product locator report.""" + + +class FetchContext: + """ Handles the various command-line options """ + + def __init__( + self, + locator: Locator, + streaming: bool, + direct_copy: bool, + dry_run=False, + force=False, + concurrency=1, + ): + self.locator = locator + self.streaming = streaming + self.direct_copy = direct_copy + self.dry_run = dry_run + self.force = force + self.concurrency = concurrency + + def generate_plan(self) -> FetchPlan: + """ + Generate the fetch plan for the entire product. + + :return: a plan + """ + factory = ConfiguredFetcherFactory( + concurrency=self.concurrency, + streaming=self.streaming, + direct_copy=self.direct_copy, + dry_run=self.dry_run, + force=self.force, + ) + + # First we must prepare the fetchers for the location report + all_fetchers = self.locator.locate().fetchers(factory) + + # now we have enough stuff to proceed with generating the plan + return self.calculate_plan(factory, all_fetchers) + + @staticmethod + def calculate_plan(factory: FetcherFactory, all_fetchers: List[FileFetcher]): + """ + Calculates the fetch plan, given a list of fetchers to aggregate + and a factory for generating fetchers. + + :param factory: + :param all_fetchers: + :return: + """ + # The next step is to group like with like in terms of file fetcher resources. + # We do this by creating a group each time we find a fetcher that + # does not share resources with a "prototypical instance" from each + # of the other groups. + fetcher_classes = {} + for fetcher in all_fetchers: + found = False + for class_prototype in fetcher_classes.keys(): + if fetcher.shares_resources_with(class_prototype): + found = True + fetcher_classes[class_prototype].append(fetcher) + + if not found: + # if we made it here, we did not find a class prototype we share resources with + # therefore, we are the prototype of a new class + fetcher_classes[fetcher] = [fetcher] + + # now that we have sorted the fetchers into categories based on which one share resources + # we can assume that within each category we want to see a certain kind of parallelism + plans = [] + for fetch_class in fetcher_classes.values(): + plans.append(factory.fetch_plan(fetch_class)) + + # now the overall plan is basically to execute each of those plans in parallel + return factory.fetch_plan(plans, len(plans)) + + @staticmethod + def parse_commandline(args: List[str]) -> "FetchContext": + """ + Parse the command line to build a FetchContext that handles options, + if any + + :param args: + :return: + """ + namespace = FetchContext.arg_parser().parse_args(args) + + # determine the locator + locator = ( + FileLocator(Path(namespace.location_file)) + if namespace.location_file + else ServiceLocator(namespace.product_locator) + ) + + return FetchContext( + locator, + namespace.streaming, + namespace.direct_copy, + namespace.dry_run, + ForceMode(namespace.force), + namespace.concurrency, + ) + + @staticmethod + def arg_parser() -> argparse.ArgumentParser: + """ + Build and return an argument parser with the command line options + for data-fetcher. This is static because Sphinx needs it to build + the docs. + + Note difference from previous implementations: Capo profile, + when needed, will be obtained from the environment; + files will be retrieved to the current directory. + + Normally, whether to use the NGAS direct-copy plugin to retrieve + a file is decided on the basis of a file's execution site, + location, and cluster; the --streaming and --direct-copy flag can + force one method or the other. If direct-copy is specified but + not possible, an error will be thrown. + + :return: an argparse 'parser' with command line options for productfetcher. + """ + + parser = argparse.ArgumentParser( + description=_PROLOGUE, + formatter_class=argparse.RawTextHelpFormatter, + ) + + parser.add_argument( + CLIParam.CONCURRENCY.value, + action="store", + dest="concurrency", + help="maxmimum threads per plan (set to 1 for no threading)", + type=int, + default=16, + ) + + exclusive_group = parser.add_mutually_exclusive_group(required=True) + + exclusive_group.add_argument( + CLIParam.SPL.value, + action="store", + dest="product_locator", + help="product locator to download", + ) + exclusive_group.add_argument( + CLIParam.FILE.value, + action="store", + dest="location_file", + help="product locator report (in JSON)", + ) + + dry_or_force_options = parser.add_mutually_exclusive_group(required=False) + dry_or_force_options.add_argument( + CLIParam.DRY.value, + action="store_true", + dest="dry_run", + help="dry run; do not fetch product", + ) + + dry_or_force_options.add_argument( + CLIParam.FORCE.value, + action="store_true", + dest="force", + help="overwrite existing file(s) at dest", + ) + + copy_or_stream_options = parser.add_mutually_exclusive_group(required=False) + copy_or_stream_options.add_argument( + CLIParam.DIRECT_COPY.value, + action="store_true", + dest="direct_copy", + help="force direct-copy from NGAS", + ) + copy_or_stream_options.add_argument( + CLIParam.STREAMING.value, + action="store_true", + dest="streaming", + help="force streaming from NGAS", + ) + + return parser + + +def main(args=None): + """CLI entry point""" + # parse the arguments + context = FetchContext.parse_commandline(args) + + # generate the plan + plan = context.generate_plan() + + # perform the fetch + plan.fetch() + + +if __name__ == "__main__": + main() diff --git a/apps/cli/executables/datafetcher/datafetcher/redesign/validators.py b/apps/cli/executables/datafetcher/datafetcher/redesign/validators.py new file mode 100644 index 0000000000000000000000000000000000000000..271f9648fc10f4e25416c8e600b980ac66d2334b --- /dev/null +++ b/apps/cli/executables/datafetcher/datafetcher/redesign/validators.py @@ -0,0 +1,87 @@ +""" +The validation system is fairly small and straightforward. Each FileValidator +is given whatever special parameters it needs in the constructor, and then +exposes a validate method that validates a file path according to whatever +its special sauce is. + +We have three of these at the moment: + + - SizeValidator ensures that the file is the expected size (in bytes) + - Crc32Validator calculates the crc32 of a file and makes sure it matches + the expected value. NGAS keeps these so we happen to have them on-hand + - XmlValidator ensures that an XML file is well-formed + +""" + +# pylint: disable=E0401, E0402, R0201, R0903 +import binascii +from pathlib import Path +import xml.sax + +from .exceptions import FileValidationFault +from .interfaces import FileValidator + + +class SizeValidator(FileValidator): + """ + Validates the size of a file in bytes. If the expected and actual sizes match, + the file validates. + """ + + def __init__(self, size: int): + self.size = size + + def validate(self, path: Path): + # check the size + actual_size = path.stat().st_size + if actual_size != self.size: + raise FileValidationFault( + f"File size mismatch: expected {self.size} but received {actual_size}" + ) + print(f"File size matched for {path}") + + +class Crc32Validator(FileValidator): + """ + Validates the cksum of a file. This is an algorithm used by NGAS. + If the content of the file generates the same cksum we expect, + the file passes validation. + """ + + def __init__(self, checksum: int): + self.checksum = checksum + + def validate(self, path: Path): + # there are a couple ways to do this, another is with the zlib module + # the important thing is reading this a chunk at a time + checksum = binascii.crc32(b"") + with path.open("rb") as content: + # read1() guarantees that only a single system read() call is made + # this should be more efficient than reading the entire thing into memory at once + # 0 is returned when we run out of stuff to read + while len(chunk := content.read1()) != 0: + checksum = binascii.crc32(chunk, checksum) + + # the magic bitmath here is courtesy of the Python documentation: + # Python 2 generated signed numbers here, but Python 3 generates unsigned numbers instead + if checksum != (self.checksum & 0xFFFFFFFF): + raise FileValidationFault( + f"CRC32 failed: expected {self.checksum & 0xFFFFFFFF}, got {checksum} instead" + ) + print(f"CRC32 matched at {path}") + + +class XmlWellFormedValidator(FileValidator): + """ + Validates that a file is well-formed XML. ALMA SDM files happen + to be XML and can be validated this way. + """ + + def validate(self, path: Path): + try: + parser = xml.sax.make_parser() + parser.setContentHandler(xml.sax.handler.ContentHandler()) + parser.parse(path) + print(f"XML valid at {path}") + except Exception as exc: + raise FileValidationFault("XML is not well-formed", exc) from exc diff --git a/apps/cli/executables/datafetcher/setup.cfg b/apps/cli/executables/datafetcher/setup.cfg index a392d237b96105d67126888c64caed04dd90bc20..9ee4ebf30d666131e8c42307f55f400bcd92f8ee 100644 --- a/apps/cli/executables/datafetcher/setup.cfg +++ b/apps/cli/executables/datafetcher/setup.cfg @@ -1,5 +1,5 @@ [metadata] -description-file = README.txt +description_file = README.txt [aliases] test=pytest diff --git a/apps/cli/executables/datafetcher/setup.py b/apps/cli/executables/datafetcher/setup.py index 88e4f2ff3a4e2540001b1723162ed776ac9fcf6c..c85d44d307fee3b42c57f99d70df03125f1da7ad 100644 --- a/apps/cli/executables/datafetcher/setup.py +++ b/apps/cli/executables/datafetcher/setup.py @@ -3,12 +3,15 @@ from pathlib import Path -from setuptools import setup +from setuptools import setup, find_packages VERSION = open("datafetcher/_version.py").readlines()[-1].split()[-1].strip("\"'") README = Path("README.md").read_text() requires = [ + "marshmallow>=3.12.1,<3.13", + "cx_Oracle>=8.1.0,<8.2", + "sqlalchemy==1.3.23", "pika>=1.1,<2", "pycapo>=0.3.0,<1.0", "beautifulsoup4>=4.9.1,<5.0", @@ -28,9 +31,14 @@ setup( url="TBD", license="GPL", install_requires=requires, - tests_require=["pytest", "pytest-resource-path"], + tests_require=["pytest", "pytest-resource-path", "requests-mock"], keywords=[], - packages=["datafetcher"], + packages=find_packages(), classifiers=["Programming Language :: Python :: 3.8"], - entry_points={"console_scripts": ["datafetcher = datafetcher.datafetcher:main"]}, + entry_points={ + "console_scripts": [ + "datafetcher = datafetcher.datafetcher:main", + "productfetcher = datafetcher.redesign.product_fetcher:main", + ] + }, ) diff --git a/apps/cli/executables/datafetcher/tests/redesign/test_fetch_plans.py b/apps/cli/executables/datafetcher/tests/redesign/test_fetch_plans.py new file mode 100644 index 0000000000000000000000000000000000000000..dd481a60f0f8eb0e76a2bd1ded25ef45a433e2ae --- /dev/null +++ b/apps/cli/executables/datafetcher/tests/redesign/test_fetch_plans.py @@ -0,0 +1,86 @@ +""" Test our fetch plans """ + +import pathlib +from unittest.mock import MagicMock + +# pylint: disable=C0116, E0401, E0402, R0201 + +import pytest + +from datafetcher.redesign.exceptions import FetchError +from datafetcher.redesign.fetch_plan import ParallelFetchPlan +from datafetcher.redesign.interfaces import FileFetcher, LocatedFile + + +class AlwaysSuccessfulFakeFetcher(FileFetcher): + """ I can always fetch -- just ask me! """ + + @property + def file(self) -> LocatedFile: + raise NotImplementedError + + def shares_resources_with(self, other: "FileFetcher") -> bool: + raise NotImplementedError + + def do_fetch(self) -> pathlib.Path: + return pathlib.Path.home() + + +class AlwaysFailingFakeFetcher(FileFetcher): + """No matter how I try, I can't fetch. I'm pathetic. My life is + meaningless.""" + + @property + def file(self) -> LocatedFile: + raise NotImplementedError + + def shares_resources_with(self, other: "FileFetcher") -> bool: + raise NotImplementedError + + def fetch(self): + self.do_fetch() + + def do_fetch(self) -> pathlib.Path: + raise FetchError("I always fail at this for some reason") + + +def test_parallel_plans_execute_all_fetchers(capsys): + """ + Make a bunch of fetchers and confirm the fetch plan invokes them all. + + :param capsys: + :return: + """ + + def with_fetchers_and_concurrency(fetcher_count: int, concurrency_level: int): + # Prove that we always executed all the fetchers with no concurrency whatsoever + fetchers = [MagicMock(wraps=AlwaysSuccessfulFakeFetcher()) for _ in range(fetcher_count)] + ParallelFetchPlan(fetchers, concurrency=concurrency_level).fetch() + for fetcher in fetchers: + fetcher.fetch.assert_called() + + with_fetchers_and_concurrency(10, 1) + with_fetchers_and_concurrency(16, 4) + with_fetchers_and_concurrency(4, 16) + capsys.readouterr() + + +def test_failures_always_found(capsys): + """ + We should always fail when we expect to fail. + + :param capsys: + :return: + """ + fetchers = [MagicMock(wraps=AlwaysSuccessfulFakeFetcher()) for _ in range(7)] + + # no matter what concurrency level or where in the list we have an exception, + # it should always cause the overall fetch plan to throw an exception + for i in range(len(fetchers)): + fetchers_copy = fetchers[:] + fetchers_copy.insert(i, AlwaysFailingFakeFetcher()) + for concurrency_level in range(1, len(fetchers_copy)): + with pytest.raises(FetchError): + ParallelFetchPlan(fetchers_copy, concurrency=concurrency_level).fetch() + + capsys.readouterr() diff --git a/apps/cli/executables/datafetcher/tests/redesign/test_fetcher_factory.py b/apps/cli/executables/datafetcher/tests/redesign/test_fetcher_factory.py new file mode 100644 index 0000000000000000000000000000000000000000..2f3901ffb377c5bfc617ff09a980e172df03cab8 --- /dev/null +++ b/apps/cli/executables/datafetcher/tests/redesign/test_fetcher_factory.py @@ -0,0 +1,56 @@ +""" Make sure our fetcher factory functions fine, friend """ + +from typing import NamedTuple + +# pylint: disable=E0239, E0401, E0402, R0201, R0903 + +from datafetcher.redesign.fetcher_factory import ConfiguredFetcherFactory +from datafetcher.redesign.fetchers import ( + DryRunFakeFileFetcher, + NgasStreamingFileFetcher, + NgasDirectCopyFileFetcher, + NgasModeDetectingFetcher, +) + + +class FakeFile(NamedTuple): + """ Ceci n’est pas une file """ + + size: int + checksum: int + + # direct copy uses this + def can_direct_copy(self) -> bool: + return True + + +def test_configured_fetcher_factory(): + """ + Various tests of the fetcher factory + + :return: + """ + fake_file = FakeFile(239823983, -329831113) + + # let's try a few combinations and make sure we find reasonable things + # first, let's make sure the concurrency value is used properly + factory = ConfiguredFetcherFactory(concurrency=732) + assert factory.fetch_plan([]).concurrency == 732 + + # let's also make sure that if we specify a lower concurrency value, it gets used instead + assert factory.fetch_plan([], concurrency=3).concurrency == 3 + + # now let's make sure that if we ask for a dry run, that's what we get + factory = ConfiguredFetcherFactory(dry_run=True) + assert isinstance(factory.fetch_ngas(fake_file).underlying, DryRunFakeFileFetcher) + assert isinstance(factory.fetch_oracle_xml(fake_file).underlying, DryRunFakeFileFetcher) + + # now let's make sure that if we ask for streaming or direct copy, we get those + factory = ConfiguredFetcherFactory(streaming=True) + assert isinstance(factory.fetch_ngas(fake_file).underlying, NgasStreamingFileFetcher) + factory = ConfiguredFetcherFactory(direct_copy=True) + assert isinstance(factory.fetch_ngas(fake_file).underlying, NgasDirectCopyFileFetcher) + + # and let's make sure that if we don't specify streaming or direct copy, we get a detection + factory = ConfiguredFetcherFactory(streaming=False, direct_copy=False) + assert isinstance(factory.fetch_ngas(fake_file).underlying, NgasModeDetectingFetcher) diff --git a/apps/cli/executables/datafetcher/tests/redesign/test_fetchers.py b/apps/cli/executables/datafetcher/tests/redesign/test_fetchers.py new file mode 100644 index 0000000000000000000000000000000000000000..25af84a692291f45f4fd33248933b90486441f6d --- /dev/null +++ b/apps/cli/executables/datafetcher/tests/redesign/test_fetchers.py @@ -0,0 +1,181 @@ +""" Tests for our various file fetchers """ + +# pylint: disable=C0103, E0401, E0402, W0621 + +from pathlib import Path +from unittest.mock import MagicMock, patch + +import pytest + +from datafetcher.redesign.exceptions import ( + RetryableFetchError, + NormalModeFileExistsError, + UnsatisfiableConstraint, +) +from datafetcher.redesign.fetchers import ( + RetryableFileFetcher, + NgasDirectCopyFileFetcher, + NgasStreamingFileFetcher, + OracleXmlFetcher, + ForceMode, +) +from datafetcher.redesign.interfaces import FileFetcher, LocatedFile +from datafetcher.redesign.locations import NgasFile, NgasServer, Location, Cluster, OracleXml + + +class FailNTimesFileFetcher(FileFetcher): + """ A failin' fetcher. This dog don't fetch. """ + + def __init__(self, n=3): + super().__init__() + self.failures = n + + @property + def file(self) -> LocatedFile: + """ + (Never called) + + :return: + """ + + def do_fetch(self) -> Path: + """ + Try to fetch N times. + + :return: + """ + if self.failures > 0: + self.failures -= 1 + raise RetryableFetchError("testing failure " + str(self.failures)) + return Path.home() + + def shares_resources_with(self, other: "FileFetcher") -> bool: + """ + (Never called) + + :return: + """ + + +@pytest.fixture +def ngas_file(tmpdir) -> NgasFile: + """ + A fake NGAS file for our tests + + :param tmpdir: + :return: + """ + return NgasFile( + "fake_ngas_id", + tmpdir, + "ASDMBinary", + -1, + "cksum", + 1, + 100, + NgasServer("localhost", Location.DSOC, Cluster.DSOC), + ) + + +@pytest.fixture +def alma_file(tmpdir) -> OracleXml: + """ + A fake ALMA SDM table for our tests + + :param tmpdir: + :return: + """ + return OracleXml("fake_uid", "fake_table", tmpdir, "relative_path", 100) + + +def test_force_mode(resource_path_root): + """ + Does force mode overwrite an existing product? + + :param resource_path_root: + :return: + """ + # check files that do exist: + assert ( + ForceMode.FORCE.check_destination( + resource_path_root / "location_files" / "alma-execblock.json" + ) + is None + ) + with pytest.raises(NormalModeFileExistsError): + ForceMode.NORMAL.check_destination( + resource_path_root / "location_files" / "alma-execblock.json" + ) + + # check files that do not exist + assert ForceMode.FORCE.check_destination(Path("/this/is/a/bogus/path")) is None + assert ForceMode.NORMAL.check_destination(Path("/this/is/a/bogus/path")) is None + + +def test_ngas_direct_copy(ngas_file: NgasFile, tmpdir, capsys): + """ + If we can direct copy, we should try to do so; if we can't, attempt to do so + should throw exception + + :param ngas_file: what we're trying to fetch + :param tmpdir: our temporary dir + :param capsys: + :return: + """ + # make sure that if we could direct copy, we try to + with patch("datafetcher.redesign.locations.NgasServer.can_direct_copy_to", return_value=True): + with patch("datafetcher.redesign.ngas.NgasConnection.direct_copy", return_value=None): + assert NgasDirectCopyFileFetcher(ngas_file).do_fetch() == Path(tmpdir) / "ASDMBinary" + + # contrarily, if we cannot, we should get an exception + with patch("datafetcher.redesign.locations.NgasServer.can_direct_copy_to", return_value=False): + with patch("datafetcher.redesign.ngas.NgasConnection.direct_copy", return_value=None): + with pytest.raises(UnsatisfiableConstraint): + NgasDirectCopyFileFetcher(ngas_file).do_fetch() + + capsys.readouterr() + + +def test_ngas_streaming(ngas_file: NgasFile, tmpdir, capsys): + # make sure that if we could direct copy, we try to + with patch("datafetcher.redesign.ngas.NgasConnection.stream", return_value=None): + assert NgasStreamingFileFetcher(ngas_file).do_fetch() == Path(tmpdir / "ASDMBinary") + + capsys.readouterr() + + +def test_alma_xml(alma_file: OracleXml, tmpdir, capsys): + mock_engine = MagicMock() + mock_engine.execute.return_value = [["this is the file content"]] + with patch( + "datafetcher.redesign.fetchers.OracleXmlFetcher.get_engine", return_value=mock_engine + ): + assert OracleXmlFetcher(alma_file).do_fetch() == Path(tmpdir / "relative_path") + + capsys.readouterr() + + +# ---------------------------------------------------------------------------- +# +# R e t r y i n g T e s t s +# +# ---------------------------------------------------------------------------- + + +def test_retrying_succeeds(capsys): + # two failures is OK + with patch("time.sleep"): + fail_twice = MagicMock(wraps=FailNTimesFileFetcher(2)) + result = RetryableFileFetcher(fail_twice).do_fetch() + assert result == Path.home() + assert fail_twice.do_fetch.call_count == 3 + capsys.readouterr() + + +def test_retrying_fails(capsys): + # three failures is too many + fail_twice = MagicMock(wraps=FailNTimesFileFetcher(3)) + with patch("time.sleep"): + with pytest.raises(RetryableFetchError): + RetryableFileFetcher(fail_twice).do_fetch() + capsys.readouterr() diff --git a/apps/cli/executables/datafetcher/tests/redesign/test_locations.py b/apps/cli/executables/datafetcher/tests/redesign/test_locations.py new file mode 100644 index 0000000000000000000000000000000000000000..b169ab1642bb3bb0206fb223b0b9c4e2438eb675 --- /dev/null +++ b/apps/cli/executables/datafetcher/tests/redesign/test_locations.py @@ -0,0 +1,85 @@ +""" Proof of concept: uses marshmallow to load locations report JSON + directly into a ClassyLocationsReport! +""" + +# pylint: disable=E0401, E0402, W0611, W0621 + +from pathlib import Path + +from datafetcher.redesign.interfaces import LocationReport +from datafetcher.redesign.locations import ( + FileLocator, + OracleXml, + NgasFile, + NgasServer, + Location, + Cluster, +) + + +def test_loads_simple_locations_report(cal_json_file: Path): + """ + Can marshmallow load an entire JSON locations report? + + :return: + """ + report = FileLocator(cal_json_file).locate() + assert isinstance(report, LocationReport) + + +def test_loads_complex_locations_report(eb_17a_109_18468_locations_file): + """ + Can marshmallow load an entire JSON locations report? + + :return: + """ + locations_rpt = FileLocator(eb_17a_109_18468_locations_file).locate() + assert isinstance(locations_rpt, LocationReport) + + files = locations_rpt.files + assert len(files) == 79 + assert locations_rpt.aggregate_size == 45572444823 + + +def test_loads_alma_locations_report(alma_execblock_report): + """ + Can marshmallow load an entire JSON locations report? + + :return: + """ + locations_rpt = FileLocator(alma_execblock_report).locate() + assert isinstance(locations_rpt, LocationReport) + + assert len(locations_rpt.files) == 1099 + assert len([file for file in locations_rpt.files if isinstance(file, OracleXml)]) == 36 + assert len([file for file in locations_rpt.files if isinstance(file, NgasFile)]) == 1063 + assert locations_rpt.aggregate_size == 4119667881 + + +def test_direct_copy_detection(): + """ + Ensure that we properly detect various scenarios when we can use + the direct copy plugin. + + :return: + """ + aoc = Path("/lustre/aoc/foo") + naasc = Path("/lustre/naasc/foo") + home = Path("/home/nobody/foo") + + # there are two cases we can direct copy to, currently: DSOC -> DSOC + # and NAASC -> NAASC (with cluster=DSOC) + assert NgasServer("", Location.DSOC, Cluster.DSOC).can_direct_copy_to(aoc) + assert NgasServer("", Location.NAASC, Cluster.DSOC).can_direct_copy_to(naasc) + + # all the other permutations we cannot: NAASC -> AOC, AOC -> NAASC, NAASC -> NAASC + assert not NgasServer("", Location.NAASC, Cluster.DSOC).can_direct_copy_to(aoc) + assert not NgasServer("", Location.DSOC, Cluster.NAASC).can_direct_copy_to(aoc) + assert not NgasServer("", Location.DSOC, Cluster.NAASC).can_direct_copy_to(naasc) + assert not NgasServer("", Location.NAASC, Cluster.NAASC).can_direct_copy_to(naasc) + assert not NgasServer("", Location.NAASC, Cluster.NAASC).can_direct_copy_to(aoc) + + # and of course, we can never direct copy to your house + for location in Location: + for cluster in Cluster: + assert not NgasServer("", location, cluster).can_direct_copy_to(home) diff --git a/apps/cli/executables/datafetcher/tests/redesign/test_ngas.py b/apps/cli/executables/datafetcher/tests/redesign/test_ngas.py new file mode 100644 index 0000000000000000000000000000000000000000..ea59f9334370244d77838633be2a3a40c77563b0 --- /dev/null +++ b/apps/cli/executables/datafetcher/tests/redesign/test_ngas.py @@ -0,0 +1,94 @@ +""" Tests for retrieving products from NGAS by various means """ +import http + +# pylint: disable=E0401, E0402, W0621 + +import pytest +import requests_mock + +from datafetcher.redesign.exceptions import RetryableFetchError, FetchError +from datafetcher.redesign.locations import NgasFile, NgasServer, Location, Cluster + +LOCALHOST_URL = "http://localhost/RETRIEVE" + + +@pytest.fixture +def ngas_file(tmpdir) -> NgasFile: + return NgasFile( + "fake_ngas_id", + tmpdir, + "ASDMBinary", + -1, + "cksum", + 1, + 100, + NgasServer("localhost", Location.DSOC, Cluster.DSOC), + ) + + +@pytest.fixture +def ngams_error() -> str: + return """<?xml version="1.0" encoding="UTF-8"?> + <!DOCTYPE NgamsStatus SYSTEM "http://nmngas01.aoc.nrao.edu:7777/RETRIEVE?internal=ngamsStatus.dtd"> + <NgamsStatus> + <Status CompletionTime="2019-06-21T10:40:59.536" Date="2019-06-21T10:40:59.538" HostId="nmngas01" + LastRequestStatUpdate="2019-06-21T10:40:59.536" + Message="NGAMS_ER_RETRIEVE_CMD:4018:ERROR: Incorrect parameter given for RETRIEVE command." + RequestId="705545" RequestTime="2019-06-21T10:40:59.528" State="ONLINE" Status="FAILURE" + SubState="IDLE" Version="v4.2.2-ALMA/2013-12-101T16:00:00"/> + </NgamsStatus>""" + + +def test_direct_copy(ngas_file: NgasFile, ngams_error): + """ + Can we retrieve a file from NGAS via the direct-copy plugin? + + :param ngas_file: the file to retrieve + :param ngams_error: the text of the expected response + :return: + """ + # test the happy path + with requests_mock.Mocker() as mock: + mock.get(LOCALHOST_URL, status_code=http.HTTPStatus.OK, text="Successfully retrieved") + ngas_file.direct_copy() + assert mock.called + assert mock.call_count == 1 + + # let's ensure that a 500 error throws an exception + with requests_mock.Mocker() as mock: + mock.get(LOCALHOST_URL, status_code=http.HTTPStatus.INTERNAL_SERVER_ERROR, text=ngams_error) + with pytest.raises(RetryableFetchError): + ngas_file.direct_copy() + + # let's ensure that a 404 error throws a non-retryable exception + with requests_mock.Mocker() as mock: + mock.get(LOCALHOST_URL, status_code=http.HTTPStatus.NOT_FOUND, text=ngams_error) + with pytest.raises(FetchError): + ngas_file.direct_copy() + + +def test_streaming(ngas_file: NgasFile, ngams_error): + """ + Can we stream a file from NGAS? + + :param ngas_file: the file to retrieve + :param ngams_error: the text of the expected response + :return: + """ + with requests_mock.Mocker() as mock: + mock.get(LOCALHOST_URL, status_code=http.HTTPStatus.OK, text="Successfully retrieved") + ngas_file.stream() + assert mock.called + assert mock.call_count == 1 + + # let's ensure that a 500 error throws an exception + with requests_mock.Mocker() as mock: + mock.get(LOCALHOST_URL, status_code=http.HTTPStatus.INTERNAL_SERVER_ERROR, text=ngams_error) + with pytest.raises(FetchError): + ngas_file.stream() + + # let's ensure that a 404 error throws a non-retryable exception + with requests_mock.Mocker() as mock: + mock.get(LOCALHOST_URL, status_code=http.HTTPStatus.NOT_FOUND, text=ngams_error) + with pytest.raises(FetchError): + ngas_file.stream() diff --git a/apps/cli/executables/datafetcher/tests/redesign/test_product_fetcher.py b/apps/cli/executables/datafetcher/tests/redesign/test_product_fetcher.py new file mode 100644 index 0000000000000000000000000000000000000000..41571c77e1e50f22ed14db6ca40cc2068635abd5 --- /dev/null +++ b/apps/cli/executables/datafetcher/tests/redesign/test_product_fetcher.py @@ -0,0 +1,107 @@ +""" Tests for the product fetcher """ + +# pylint: disable=C0103, E0401, E0402, E1136, R0201, R0903 + +from pathlib import Path +from typing import List, Union + +import pytest + +from datafetcher.redesign.fetchers import DryRunFakeFileFetcher, ForceMode +from datafetcher.redesign.interfaces import FetcherFactory, FileFetcher, LocatedFile, FetchPlan +from datafetcher.redesign.locations import FileLocator, ServiceLocator +from datafetcher.redesign.product_fetcher import FetchContext, CLIParam + + +class FakePlan(FetchPlan): + """ A non-fetching fetch plan """ + + def __init__(self, fetchers, concurrency): + self.fetchers = fetchers + self.concurrency = concurrency + + def fetch(self): + # Let's not actually fetch anything today + pass + + +class FakeFactory(FetcherFactory): + """ Creates non-fetching fetchers """ + + def fetch_ngas(self, file: LocatedFile) -> FileFetcher: + return DryRunFakeFileFetcher(file) + + def fetch_oracle_xml(self, file: LocatedFile) -> FileFetcher: + return DryRunFakeFileFetcher(file) + + def fetch_plan( + self, fetchers: List[Union[FileFetcher, FetchPlan]], concurrency: int = 1 + ) -> FetchPlan: + return FakePlan(fetchers, concurrency) + + +def test_plan_generation(resource_path_root): + """ + Can we generate a test plan? + + :param resource_path_root: + :return: + """ + report = FileLocator(resource_path_root / "location_files" / "13B-014.json").locate() + fake_factory = FakeFactory() + fetchers = report.fetchers(fake_factory) + + # we should absolutely have the same number of fetchers as we have files + assert len(fetchers) == len(report.files) + + # let's make a plan + plan = FetchContext.calculate_plan(fake_factory, fetchers) + + # we should have the same number of fetchers in the plan as in the original report + assert sum(len(subplan.fetchers) for subplan in plan.fetchers) == len(report.files) + + +def test_argument_parsing(capsys): + """ + Can we parse the command-line arguments passed in? + + :param capsys: + :return: + """ + # ensure that various combinations do not work + with pytest.raises(SystemExit): + # must have an SPL or a file + FetchContext.parse_commandline([]) + + with pytest.raises(SystemExit): + # cannot have an SPL and a file + FetchContext.parse_commandline( + [CLIParam.FILE.value, "foo", CLIParam.SPL.value, "uid://this/is/a/fakesy"] + ) + + # check the dry run value + fc = FetchContext.parse_commandline([CLIParam.DRY.value, CLIParam.FILE.value, "foo"]) + assert fc.dry_run + fc = FetchContext.parse_commandline([CLIParam.FILE.value, "foo"]) + assert not fc.dry_run + + # check the force value + fc = FetchContext.parse_commandline([CLIParam.FORCE.value, CLIParam.FILE.value, "foo"]) + assert fc.force == ForceMode.FORCE + fc = FetchContext.parse_commandline([CLIParam.FILE.value, "foo"]) + assert fc.force == ForceMode.NORMAL + + fc = FetchContext.parse_commandline([CLIParam.FILE.value, "foo"]) + assert isinstance(fc.locator, FileLocator) + assert fc.locator.file == Path("foo") + + fc = FetchContext.parse_commandline([CLIParam.SPL.value, "uid://this/is/a/fakesy"]) + assert isinstance(fc.locator, ServiceLocator) + assert fc.locator.spl == "uid://this/is/a/fakesy" + + fc = FetchContext.parse_commandline( + [CLIParam.FILE.value, "foo", CLIParam.CONCURRENCY.value, "732"] + ) + assert fc.concurrency == 732 + + capsys.readouterr() diff --git a/apps/cli/executables/datafetcher/tests/redesign/test_validators.py b/apps/cli/executables/datafetcher/tests/redesign/test_validators.py new file mode 100644 index 0000000000000000000000000000000000000000..0a7721831b6977756baab4c0b13aca94afabe82d --- /dev/null +++ b/apps/cli/executables/datafetcher/tests/redesign/test_validators.py @@ -0,0 +1,65 @@ +""" Tests for our size, checksum, and SDM table XML validators """ +import pathlib + +# pylint: disable=E0401, E0402 + +import pytest + +from datafetcher.redesign.exceptions import FileValidationFault +from datafetcher.redesign.validators import Crc32Validator, SizeValidator, XmlWellFormedValidator + + +def test_size_validator(resource_path_root: pathlib.Path, capsys): + """ + Does the size validator function as expected? + + :param resource_path_root: + :param capsys: + :return: + """ + # this should work just fine + SizeValidator(1024).validate(resource_path_root / "validators" / "random-junk") + + # these should not + with pytest.raises(FileValidationFault): + SizeValidator(1023).validate(resource_path_root / "validators" / "random-junk") + + with pytest.raises(FileValidationFault): + SizeValidator(1025).validate(resource_path_root / "validators" / "random-junk") + capsys.readouterr() + + +def test_checksum_validator(resource_path_root: pathlib.Path, capsys): + """ + Does the checksum validator function as expected? + + :param resource_path_root: + :param capsys: + :return: + """ + # this should work just fine + Crc32Validator(457325410).validate(resource_path_root / "validators" / "random-junk") + + # this should raise an error + with pytest.raises(FileValidationFault): + Crc32Validator(457325411).validate(resource_path_root / "validators" / "random-junk") + + # so should this + with pytest.raises(FileValidationFault): + Crc32Validator(-1).validate(resource_path_root / "validators" / "random-junk") + capsys.readouterr() + + +def test_xml_validator(resource_path_root: pathlib.Path, capsys): + """ + Determine whether the XML we pulled down is well-formed. + + :param resource_path_root: + :param capsys: + :return: + """ + XmlWellFormedValidator().validate(resource_path_root / "validators" / "valid.xml") + + with pytest.raises(FileValidationFault): + XmlWellFormedValidator().validate(resource_path_root / "validators" / "invalid.xml") + capsys.readouterr() diff --git a/apps/cli/executables/datafetcher/tests/testresources/location_files/13B-014.json b/apps/cli/executables/datafetcher/tests/testresources/location_files/13B-014.json new file mode 100644 index 0000000000000000000000000000000000000000..e116a1ddcc1e0f3d11ed55484abbcd0c6dd24a60 --- /dev/null +++ b/apps/cli/executables/datafetcher/tests/testresources/location_files/13B-014.json @@ -0,0 +1 @@ +{"files": [{"ngas_file_id": "uid____evla_bdf_1391196419753.bdf", "subdirectory": "13B-014.sb28677984.eb28693176.56688.810349050924", "relative_path": "ASDMBinary/uid____evla_bdf_1391196419753", "checksum": "-1792119289", "checksum_type": "ngamsGenCrc32", "version": 1, "size": 322518851, "server": {"server": "nmngas04.aoc.nrao.edu:7777", "location": "DSOC", "cluster": "DSOC"}}, {"ngas_file_id": "uid____evla_bdf_1391196420240.bdf", "subdirectory": "13B-014.sb28677984.eb28693176.56688.810349050924", "relative_path": "ASDMBinary/uid____evla_bdf_1391196420240", "checksum": "-2034697032", "checksum_type": "ngamsGenCrc32", "version": 1, "size": 71674379, "server": {"server": "nmngas03.aoc.nrao.edu:7777", "location": "DSOC", "cluster": "DSOC"}}, {"ngas_file_id": "uid___evla_sdm_X1391196414978.sdm", "subdirectory": "13B-014.sb28677984.eb28693176.56688.810349050924", "relative_path": "ASDM.xml", "checksum": "1071098596", "checksum_type": "ngamsGenCrc32", "version": 1, "size": 7557, "server": {"server": "nmngas04.aoc.nrao.edu:7777", "location": "DSOC", "cluster": "DSOC"}}, {"ngas_file_id": "uid___evla_sdm_X1391196414979.sdm", "subdirectory": "13B-014.sb28677984.eb28693176.56688.810349050924", "relative_path": "Antenna.xml", "checksum": "-667519693", "checksum_type": "ngamsGenCrc32", "version": 1, "size": 10905, "server": {"server": "nmngas04.aoc.nrao.edu:7777", "location": "DSOC", "cluster": "DSOC"}}, {"ngas_file_id": "uid___evla_sdm_X1391196414980.sdm", "subdirectory": "13B-014.sb28677984.eb28693176.56688.810349050924", "relative_path": "CalData.xml", "checksum": "-1661730835", "checksum_type": "ngamsGenCrc32", "version": 1, "size": 380, "server": {"server": "nmngas04.aoc.nrao.edu:7777", "location": "DSOC", "cluster": "DSOC"}}, {"ngas_file_id": "uid___evla_sdm_X1391196414981.sdm", "subdirectory": "13B-014.sb28677984.eb28693176.56688.810349050924", "relative_path": "CalDevice.xml", "checksum": "-1941244872", "checksum_type": "ngamsGenCrc32", "version": 1, "size": 200434, "server": {"server": "nmngas04.aoc.nrao.edu:7777", "location": "DSOC", "cluster": "DSOC"}}, {"ngas_file_id": "uid___evla_sdm_X1391196414982.sdm", "subdirectory": "13B-014.sb28677984.eb28693176.56688.810349050924", "relative_path": "CalPointing.xml", "checksum": "471473940", "checksum_type": "ngamsGenCrc32", "version": 1, "size": 392, "server": {"server": "nmngas04.aoc.nrao.edu:7777", "location": "DSOC", "cluster": "DSOC"}}, {"ngas_file_id": "uid___evla_sdm_X1391196414983.sdm", "subdirectory": "13B-014.sb28677984.eb28693176.56688.810349050924", "relative_path": "CalReduction.xml", "checksum": "-1703882883", "checksum_type": "ngamsGenCrc32", "version": 1, "size": 395, "server": {"server": "nmngas04.aoc.nrao.edu:7777", "location": "DSOC", "cluster": "DSOC"}}, {"ngas_file_id": "uid___evla_sdm_X1391196414984.sdm", "subdirectory": "13B-014.sb28677984.eb28693176.56688.810349050924", "relative_path": "ConfigDescription.xml", "checksum": "-742902158", "checksum_type": "ngamsGenCrc32", "version": 1, "size": 1978, "server": {"server": "nmngas04.aoc.nrao.edu:7777", "location": "DSOC", "cluster": "DSOC"}}, {"ngas_file_id": "uid___evla_sdm_X1391196414985.sdm", "subdirectory": "13B-014.sb28677984.eb28693176.56688.810349050924", "relative_path": "CorrelatorMode.xml", "checksum": "768423631", "checksum_type": "ngamsGenCrc32", "version": 1, "size": 897, "server": {"server": "nmngas04.aoc.nrao.edu:7777", "location": "DSOC", "cluster": "DSOC"}}, {"ngas_file_id": "uid___evla_sdm_X1391196414986.sdm", "subdirectory": "13B-014.sb28677984.eb28693176.56688.810349050924", "relative_path": "DataDescription.xml", "checksum": "1153040305", "checksum_type": "ngamsGenCrc32", "version": 1, "size": 6774, "server": {"server": "nmngas04.aoc.nrao.edu:7777", "location": "DSOC", "cluster": "DSOC"}}, {"ngas_file_id": "uid___evla_sdm_X1391196414987.sdm", "subdirectory": "13B-014.sb28677984.eb28693176.56688.810349050924", "relative_path": "Doppler.xml", "checksum": "-1344464921", "checksum_type": "ngamsGenCrc32", "version": 1, "size": 380, "server": {"server": "nmngas04.aoc.nrao.edu:7777", "location": "DSOC", "cluster": "DSOC"}}, {"ngas_file_id": "uid___evla_sdm_X1391196414988.sdm", "subdirectory": "13B-014.sb28677984.eb28693176.56688.810349050924", "relative_path": "ExecBlock.xml", "checksum": "829018431", "checksum_type": "ngamsGenCrc32", "version": 2, "size": 2187, "server": {"server": "nmngas01.aoc.nrao.edu:7777", "location": "DSOC", "cluster": "DSOC"}}, {"ngas_file_id": "uid___evla_sdm_X1391196414989.sdm", "subdirectory": "13B-014.sb28677984.eb28693176.56688.810349050924", "relative_path": "Feed.xml", "checksum": "1586490751", "checksum_type": "ngamsGenCrc32", "version": 1, "size": 261301, "server": {"server": "nmngas04.aoc.nrao.edu:7777", "location": "DSOC", "cluster": "DSOC"}}, {"ngas_file_id": "uid___evla_sdm_X1391196414990.sdm", "subdirectory": "13B-014.sb28677984.eb28693176.56688.810349050924", "relative_path": "Field.xml", "checksum": "-1972937667", "checksum_type": "ngamsGenCrc32", "version": 1, "size": 775, "server": {"server": "nmngas04.aoc.nrao.edu:7777", "location": "DSOC", "cluster": "DSOC"}}, {"ngas_file_id": "uid___evla_sdm_X1391196414991.sdm", "subdirectory": "13B-014.sb28677984.eb28693176.56688.810349050924", "relative_path": "Flag.xml", "checksum": "1430321664", "checksum_type": "ngamsGenCrc32", "version": 1, "size": 14829, "server": {"server": "nmngas04.aoc.nrao.edu:7777", "location": "DSOC", "cluster": "DSOC"}}, {"ngas_file_id": "uid___evla_sdm_X1391196414992.sdm", "subdirectory": "13B-014.sb28677984.eb28693176.56688.810349050924", "relative_path": "Main.xml", "checksum": "1682700919", "checksum_type": "ngamsGenCrc32", "version": 1, "size": 2193, "server": {"server": "nmngas04.aoc.nrao.edu:7777", "location": "DSOC", "cluster": "DSOC"}}, {"ngas_file_id": "uid___evla_sdm_X1391196414993.sdm", "subdirectory": "13B-014.sb28677984.eb28693176.56688.810349050924", "relative_path": "Pointing.xml", "checksum": "-1935543762", "checksum_type": "ngamsGenCrc32", "version": 1, "size": 383, "server": {"server": "nmngas04.aoc.nrao.edu:7777", "location": "DSOC", "cluster": "DSOC"}}, {"ngas_file_id": "uid___evla_sdm_X1391196414994.sdm", "subdirectory": "13B-014.sb28677984.eb28693176.56688.810349050924", "relative_path": "PointingModel.xml", "checksum": "-1404522861", "checksum_type": "ngamsGenCrc32", "version": 1, "size": 11107, "server": {"server": "nmngas04.aoc.nrao.edu:7777", "location": "DSOC", "cluster": "DSOC"}}, {"ngas_file_id": "uid___evla_sdm_X1391196414995.sdm", "subdirectory": "13B-014.sb28677984.eb28693176.56688.810349050924", "relative_path": "Polarization.xml", "checksum": "-531542200", "checksum_type": "ngamsGenCrc32", "version": 1, "size": 804, "server": {"server": "nmngas04.aoc.nrao.edu:7777", "location": "DSOC", "cluster": "DSOC"}}, {"ngas_file_id": "uid___evla_sdm_X1391196414996.sdm", "subdirectory": "13B-014.sb28677984.eb28693176.56688.810349050924", "relative_path": "Processor.xml", "checksum": "496514312", "checksum_type": "ngamsGenCrc32", "version": 1, "size": 612, "server": {"server": "nmngas04.aoc.nrao.edu:7777", "location": "DSOC", "cluster": "DSOC"}}, {"ngas_file_id": "uid___evla_sdm_X1391196414997.sdm", "subdirectory": "13B-014.sb28677984.eb28693176.56688.810349050924", "relative_path": "Receiver.xml", "checksum": "-2004447590", "checksum_type": "ngamsGenCrc32", "version": 1, "size": 7029, "server": {"server": "nmngas04.aoc.nrao.edu:7777", "location": "DSOC", "cluster": "DSOC"}}, {"ngas_file_id": "uid___evla_sdm_X1391196414998.sdm", "subdirectory": "13B-014.sb28677984.eb28693176.56688.810349050924", "relative_path": "SBSummary.xml", "checksum": "-1665712816", "checksum_type": "ngamsGenCrc32", "version": 1, "size": 1500, "server": {"server": "nmngas04.aoc.nrao.edu:7777", "location": "DSOC", "cluster": "DSOC"}}, {"ngas_file_id": "uid___evla_sdm_X1391196414999.sdm", "subdirectory": "13B-014.sb28677984.eb28693176.56688.810349050924", "relative_path": "Scan.xml", "checksum": "1284094101", "checksum_type": "ngamsGenCrc32", "version": 1, "size": 1328, "server": {"server": "nmngas04.aoc.nrao.edu:7777", "location": "DSOC", "cluster": "DSOC"}}, {"ngas_file_id": "uid___evla_sdm_X1391196415000.sdm", "subdirectory": "13B-014.sb28677984.eb28693176.56688.810349050924", "relative_path": "Source.xml", "checksum": "1277850579", "checksum_type": "ngamsGenCrc32", "version": 1, "size": 7855, "server": {"server": "nmngas04.aoc.nrao.edu:7777", "location": "DSOC", "cluster": "DSOC"}}, {"ngas_file_id": "uid___evla_sdm_X1391196415001.sdm", "subdirectory": "13B-014.sb28677984.eb28693176.56688.810349050924", "relative_path": "SpectralWindow.xml", "checksum": "-1023258867", "checksum_type": "ngamsGenCrc32", "version": 1, "size": 13253, "server": {"server": "nmngas04.aoc.nrao.edu:7777", "location": "DSOC", "cluster": "DSOC"}}, {"ngas_file_id": "uid___evla_sdm_X1391196415002.sdm", "subdirectory": "13B-014.sb28677984.eb28693176.56688.810349050924", "relative_path": "State.xml", "checksum": "-1494809656", "checksum_type": "ngamsGenCrc32", "version": 1, "size": 551, "server": {"server": "nmngas04.aoc.nrao.edu:7777", "location": "DSOC", "cluster": "DSOC"}}, {"ngas_file_id": "uid___evla_sdm_X1391196415003.sdm", "subdirectory": "13B-014.sb28677984.eb28693176.56688.810349050924", "relative_path": "Station.xml", "checksum": "660945815", "checksum_type": "ngamsGenCrc32", "version": 1, "size": 5871, "server": {"server": "nmngas04.aoc.nrao.edu:7777", "location": "DSOC", "cluster": "DSOC"}}, {"ngas_file_id": "uid___evla_sdm_X1391196415004.sdm", "subdirectory": "13B-014.sb28677984.eb28693176.56688.810349050924", "relative_path": "Subscan.xml", "checksum": "-1515730135", "checksum_type": "ngamsGenCrc32", "version": 1, "size": 1306, "server": {"server": "nmngas04.aoc.nrao.edu:7777", "location": "DSOC", "cluster": "DSOC"}}, {"ngas_file_id": "uid___evla_sdm_X1391196415005.sdm", "subdirectory": "13B-014.sb28677984.eb28693176.56688.810349050924", "relative_path": "SwitchCycle.xml", "checksum": "-1661885475", "checksum_type": "ngamsGenCrc32", "version": 1, "size": 697, "server": {"server": "nmngas04.aoc.nrao.edu:7777", "location": "DSOC", "cluster": "DSOC"}}, {"ngas_file_id": "uid___evla_sdm_X1391196415006.sdm", "subdirectory": "13B-014.sb28677984.eb28693176.56688.810349050924", "relative_path": "SysCal.xml", "checksum": "602414074", "checksum_type": "ngamsGenCrc32", "version": 1, "size": 377, "server": {"server": "nmngas04.aoc.nrao.edu:7777", "location": "DSOC", "cluster": "DSOC"}}, {"ngas_file_id": "uid___evla_sdm_X1391196415007.bin", "subdirectory": "13B-014.sb28677984.eb28693176.56688.810349050924", "relative_path": "SysPower.bin", "checksum": "870209040", "checksum_type": "ngamsGenCrc32", "version": 1, "size": 2834148, "server": {"server": "nmngas04.aoc.nrao.edu:7777", "location": "DSOC", "cluster": "DSOC"}}, {"ngas_file_id": "uid___evla_sdm_X1391196415008.sdm", "subdirectory": "13B-014.sb28677984.eb28693176.56688.810349050924", "relative_path": "Weather.xml", "checksum": "-1608402443", "checksum_type": "ngamsGenCrc32", "version": 1, "size": 1236, "server": {"server": "nmngas04.aoc.nrao.edu:7777", "location": "DSOC", "cluster": "DSOC"}}], "aggregate_size": 397592664} \ No newline at end of file diff --git a/apps/cli/executables/datafetcher/tests/testresources/validators/invalid.xml b/apps/cli/executables/datafetcher/tests/testresources/validators/invalid.xml new file mode 100644 index 0000000000000000000000000000000000000000..57a3325f0a4c79b9d08e2403bb024d9fb42ef860 --- /dev/null +++ b/apps/cli/executables/datafetcher/tests/testresources/validators/invalid.xml @@ -0,0 +1,5 @@ +<html> + <head> + <body>This is not + </head> +</html> \ No newline at end of file diff --git a/apps/cli/executables/datafetcher/tests/testresources/validators/random-junk b/apps/cli/executables/datafetcher/tests/testresources/validators/random-junk new file mode 100644 index 0000000000000000000000000000000000000000..1cb316ee8d419fb06d0a219aa3286593b7fba002 Binary files /dev/null and b/apps/cli/executables/datafetcher/tests/testresources/validators/random-junk differ diff --git a/apps/cli/executables/datafetcher/tests/testresources/validators/valid.xml b/apps/cli/executables/datafetcher/tests/testresources/validators/valid.xml new file mode 100644 index 0000000000000000000000000000000000000000..c460ff3938247ce6e7cd85bf828df0a2e406f3a3 --- /dev/null +++ b/apps/cli/executables/datafetcher/tests/testresources/validators/valid.xml @@ -0,0 +1,5 @@ +<html> + <head> + <body>This is real XML so far</body> + </head> +</html> \ No newline at end of file diff --git a/services/workflow/requirements.txt b/services/workflow/requirements.txt index eccade3d780bac8bd88fb8e4a9f34008e066aefd..b9f8bf5b24c90de2fa0acceed809597654a0f8df 100644 --- a/services/workflow/requirements.txt +++ b/services/workflow/requirements.txt @@ -25,6 +25,7 @@ pyramid_tm == 2.2.1 pytest == 5.4.3 pytest-resource-path == 1.2.1 requests == 2.23 +requests_mock == 1.9.2 sqlalchemy == 1.3.23 waitress == 1.4 zope.sqlalchemy == 1.1