Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • ssa/workspaces
1 result
Show changes
Commits on Source (6)
Showing
with 539 additions and 181 deletions
# CASA Envoy: The Workspaces CASA Processing System
CASA Envoy is responsible for environment setup and launch of all types of CASA processing for the Workspaces System.
Currently, this includes Standard Calibration, CMS Restores, Standard CMS Imaging, and CMS Restore-and-Image.
```
usage: casa_envoy [-h] [-c STANDARD_CAL STANDARD_CAL] [-i STANDARD_IMG STANDARD_IMG] [--restore] [--integrated]
Workspaces CASA processing launcher
optional arguments:
-h, --help show this help message and exit
-c, --standard-cal run the standard calibration CASA pipeline
-i, --standard-img run the standard imaging CASA pipeline
--restore run the restore measurement set CASA pipeline, use in conjunction with '-c'
--integrated run an integrated calibration-imaging pipeline, use in conjunction with '-i'
```
While initiating CASA is relatively straight forward and essentially identical between use cases, there are variations in
the required PPR structure and the initial location of input data. For this reason CASA Envoy's functionality
can be broken into two underlying parts: Setup and Launch.
## Setup
Setup can be further divided into three essential parts: Auditing, Environment, and Data
### Auditing
There are two types of auditing that occur in CASA Envoy. The first, Directory Auditing, ensures that the required
directory structure of *rawdata*, *working*, and *products* is contained in the processing directory.
The second type is File Auditing. CASA Envoy must be provided to different files at inital call, *metadata.json* and *PPR.xml*. These files must be
audited to ensure that all required fields are present. *PPR.xml* must be submitted directly to CASA so it is important
to make sure that it is sstructured correctly before submission. While *metadata.json* is not submitted to CASA, it does
contain all the information for results delivery post CASA and needs to be audited as well.
A further function of the File Auditing is correcting *PPR.xml* for processing with HTCondor. CASA requires the name of
the processing directory it is running in - unfortunately, with HTCondor it isn't possible to know that name prior to
submitting a condor job. Therefore, *PPR.xml* is corrected after submission to condor with the correct directory
information. This corrected *PPR.xml* is placed into the *working* directory where it is then used by CASA, and the
unaltered original remains in the parent processing directory.
### Environment
CASA has several environment variable that are required to be set by whatever system it is running on.
These include:
| ENV Variables | Description |
|--- |--- |
| SCIPIPE_ROOT | location of processing directory |
| CASA_HOME | location of CASA version to use for processing |
| PPR_FILENAME | name of PPR file (Should always be PPR.xml, but sometimes might not) |
| LANG | Turns out condor doesn't set this right for CASA happiness so we're fixing that here |
### Data
Usually the starting location for data is *rawdata*. This is the location where data is always placed by the
Workspaces Product Fetcher. For Standard Calibrations and Standard CMS Imaging this is correct. However, for
calibration restores, the calibration tar file needs to be extracted into the *products* directory. This step ensures
that all input data is in the correct location pre-processing for CASA to find when required.
## Launch
Since the only difference between calibration and imaging processing is the contents of *PPR.xml*, CASA Envoy contains
a single CASA Launcher class which is utilized by the two type launcher classes: Calibration Launcher and Imaging
Launcher. Each type launcher handles both standard and restore or integration types of processing. The two type classes ensure that setup is correct for each product type, as described above, and then calls
the CASA Launcher.
<br/>
<br/>
Post CASA processing, the casa log is checked for error flags and CASA envoy exits.
......@@ -34,15 +34,21 @@ def get_fields_for(product_type: str, filename: str) -> list:
"sdmId",
]
restore_metadata_list = [
"calProductLocator",
]
ppr_list = ["RootDirectory", "RelativePath", "SdmIdentifier"]
if ".xml" in filename:
return ppr_list
if ".json" in filename and "cal" in product_type:
if ".json" in filename and "cal" in product_type or product_type == "integrated":
return cal_metadata_list
elif ".json" in filename and "img" in product_type:
return img_metadata_list
elif ".json" in filename and product_type == "restore":
return cal_metadata_list + restore_metadata_list
def get_xml_content(file: AbstractTextFile):
......@@ -126,10 +132,12 @@ class AuditFiles(AuditorIF):
class AuditDirectories(AuditorIF):
def __init__(self, ppr: AbstractTextFile, settings: Dict[str, str]):
self.logger = logging.getLogger("casa_envoy")
self.parameters = settings
self.rootDirectory = settings["rootDirectory"]
self.relative_path = settings["processingDirectory"]
self.sdmId = get_value_for(ppr, "SdmIdentifier")
@property
def audit(self) -> bool:
current = os.getcwd()
needed = self.rootDirectory + "/" + self.relative_path
......@@ -146,11 +154,21 @@ class AuditDirectories(AuditorIF):
data = os.listdir(Path(current + "/rawdata/"))
if len(data) > 0:
self.logger.info("Data is available. Proceeding...")
if self.parameters["product_type"] is "restore":
self.logger.info("Checking products/ for calibration tables...")
cal_data = os.listdir(Path(current + "/products/"))
if len(cal_data) > 0:
self.logger.info("Calibration data is available. Proceeding...")
else:
self.logger.error("FAILURE: calibration data not found in products/")
return False
return True
else:
self.logger.info("FAILURE: data not found in rawdata/")
self.logger.error("FAILURE: data not found in rawdata/")
return False
else:
self.logger.info(
self.logger.error(
"DIRECTORY ERROR: A directory is missing from the processing root directory."
)
"""
Classes and methods for laying the data location foundation for various types of CASA processing
"""
import logging
import os
import tarfile
from pathlib import Path
import json
from casa_envoy.schema import AbstractTextFile
from casa_envoy.interfaces import FoundationIF
class RestoreFoundation(FoundationIF):
def __init__(self, parameters: dict, metadata: AbstractTextFile):
self.logger = logging.getLogger("casa_envoy")
self.parameters = parameters
self.parent_path = parameters["parent_path"]
self.metadata = metadata
def data_foundation(self):
"""
CMS Restore requires two inputs: An EB and a Calibration for that EB
After download, all data is in rawdata and the calibrations tables are contained
in a tar file. We need to extract all the calibration tables to the products directory
for CASA processing.
:return:
"""
self.logger.info("LAYING DATA FOUNDATION...")
# ensure we're starting from the parent directory
os.chdir(self.parent_path)
self.extract_cal()
self.set_permissions()
self.logger.info("DATA FOUNDATION COMPLETE!")
def extract_cal(self):
self.logger.info("Extracting calibration tar file to products directory...")
cal_name = json.loads(self.metadata.content)["fileSetIds"][1]
cal_path = "./rawdata" + cal_name
if Path(cal_path).exists():
calibration = tarfile.open(cal_path)
# extract to products
calibration.extractall(path="./products")
calibration.close()
else:
self.logger.error(f"ERROR: calibration tar file {cal_name} not found in rawdata!")
def set_permissions(self):
self.logger.info("Ensuring correct file permissions....")
path = Path("./products")
for root, dirs, files in os.walk(path):
for d in dirs:
os.chmod(os.path.join(root, d), 0o755)
for f in files:
os.chmod(os.path.join(root, f), 0o755)
......@@ -26,5 +26,18 @@ class AuditorIF(ABC):
Generic functionality implementation for auditor classes
"""
@abc.abstractmethod
def audit(self):
raise NotImplementedError
pass
class FoundationIF(ABC):
"""
Generic Foundation methods
Should be implemented for any type of CASA processing that requires initial data placement
in locations other than rawdata after download
"""
@abc.abstractmethod
def data_foundation(self):
pass
......@@ -9,6 +9,7 @@ from typing import Dict
import json
from casa_envoy.auditor import AuditFiles, AuditDirectories
from casa_envoy.foundation import RestoreFoundation
from casa_envoy.interfaces import LauncherIF
from casa_envoy.schema import AbstractTextFile
......@@ -102,23 +103,49 @@ class CalibrationLauncher(LauncherIF):
self.metadata = get_abs_file(parameters.get("metadata"))
def launch_casa(self):
if self.check_calibratable():
if self.parameters["product_type"] == "restore":
check_input = self.check_restorable()
else:
check_input = self.check_calibratable()
if check_input:
self.prepare_for_casa()
self.run_audit(self.parameters)
CasaLauncher(self.parameters).run()
else:
self.logger.error("ERROR: Provided SPL is not type execution block!")
self.logger.error("ERROR: Provided SPL(s) are not correct type(s) for processing!")
sys.exit(1)
def prepare_for_casa(self):
# Ensure all data is in the required locations for CASA processing (This is not always rawdata!)
if self.parameters["product_type"] == "restore":
RestoreFoundation(self.parameters, self.metadata).data_foundation()
return
def check_calibratable(self) -> bool:
spl = self.metadata.content["productLocator"]
if "execblock" in spl:
return True
else:
self.logger.info("SPL ERROR: This product locator is not calibratable!")
self.logger.error(
"SPL ERROR: This product locator is not calibratable! Please check your inputs"
)
return False
def check_restorable(self) -> bool:
spl = self.metadata.content["productLocator"]
cal_spl = self.metadata.content["calProductLocator"]
if "execblock" in spl and "calibration" in cal_spl:
return True
else:
self.logger.error(
"SPL ERROR: This set of product locators are not restorable! Please check your inputs"
)
def run_audit(self, parameters: Dict[str, str]):
dir_audit = AuditDirectories(self.ppr, parameters).audit()
dir_audit = AuditDirectories(self.ppr, parameters).audit
if dir_audit:
self.logger.info("Directory audit successful!")
else:
......@@ -141,7 +168,12 @@ class ImagingLauncher(LauncherIF):
self.metadata = get_abs_file(parameters.get("metadata"))
def launch_casa(self):
if self.check_imageable():
if self.parameters["product_type"] == "integrated":
check_input = self.check_cal_and_imageable()
else:
check_input = self.check_imageable()
if check_input:
self.run_audit(self.parameters)
CasaLauncher(self.parameters).run()
else:
......@@ -158,8 +190,30 @@ class ImagingLauncher(LauncherIF):
self.logger.info("CMS ERROR: Imaging requires a valid CMS name and location!")
return False
def check_cal_and_imageable(self) -> bool:
wf_name = self.metadata.content["workflowName"]
spl = self.metadata.content["productLocator"]
if "restore" in wf_name:
cal_spl = self.metadata.content["calProductLocator"]
if "execblock" in spl and "calibration" in cal_spl:
return True
else:
self.logger.error(
"SPL ERROR: This set of product locators are not restorable! Please check your inputs"
)
elif "cal" in wf_name:
if "execblock" in spl:
return True
else:
self.logger.error(
"SPL ERROR: This product locator is not calibratable! Please check your inputs"
)
return False
def run_audit(self, parameters: Dict[str, str]):
dir_audit = AuditDirectories(self.ppr, parameters).audit()
dir_audit = AuditDirectories(self.ppr, parameters).audit
if dir_audit:
self.logger.info("Directory audit successful!")
else:
......
......@@ -50,6 +50,7 @@ def arg_parser() -> argparse.ArgumentParser:
formatter_class=argparse.RawTextHelpFormatter,
)
parser.add_argument(
"-c",
"--standard-cal",
nargs=2,
action="store",
......@@ -57,11 +58,24 @@ def arg_parser() -> argparse.ArgumentParser:
help="run the standard calibration CASA pipeline",
)
parser.add_argument(
"-i",
"--standard-img",
nargs=2,
action="store",
required=False,
help="run the standard calibration CASA pipeline",
help="run the standard imaging CASA pipeline",
)
parser.add_argument(
"--restore",
required=False,
action="store_true",
help="run the restore measurement set CASA pipeline, use in conjunction with '-c'",
)
parser.add_argument(
"--integrated",
required=False,
action="store_true",
help="run an integrated calibration-imaging pipeline, use in conjunction with '-i'",
)
return parser
......@@ -73,7 +87,10 @@ def main():
if args.standard_cal is not None:
parameters = _get_settings(pathlib.Path(path), args.standard_cal)
parameters["product_type"] = "standard-cal"
if args.restore:
parameters["product_type"] = "restore"
else:
parameters["product_type"] = "standard-cal"
CalibrationLauncher(parameters).launch_casa()
# make sure we return to the parent directory after processing
......@@ -81,7 +98,10 @@ def main():
elif args.standard_img is not None:
parameters = _get_settings(pathlib.Path(path), args.standard_img)
parameters["product_type"] = "standard-img"
if args.integrated:
parameters["product_type"] = "integrated"
else:
parameters["product_type"] = "standard-img"
ImagingLauncher(parameters).launch_casa()
# return to parent directory after processing
......
{
"fileSetIds": ["brain_000.58099.67095825232", "calibration.tar"],
"workflowName": "restore_cms",
"systemId": "2",
"productLocator": "uid://evla/execblock/ec082e65-452d-4fec-ad88-f5b4af1f9e36",
"calProductLocator": "uid://evla/calibration/ec082e65-452d-4fec-ad88-f5b4af1f9e36",
"projectMetadata": {
"projectCode": "Operations",
"title": "",
"startTime": "58099.6710792824",
"observer": "VLA Operations"
},
"destinationDirectory": "/lustre/aoc/cluster/pipeline/docker/workspaces/spool/tmpabcd1234"
}
......@@ -18,13 +18,15 @@ cal_settings = {
"homeForReprocessing": "/home/casa/packages/pipeline/current",
"rootDirectory": "/tmp/workspaces_tmp/",
"processingDirectory": "tmpiox5trbp",
"metadata": "test/test.json",
"ppr": "test/PPR.xml",
"metadata": "test/input_files/test.json",
"ppr": "test/input_files/PPR.xml",
"product_type": "standard-cal",
}
test_ppr = AbstractTextFile(filename="test/PPR.xml", content=Path("test/PPR.xml").read_text())
test_ppr = AbstractTextFile(
filename="test/input_files/PPR.xml", content=Path("test/input_files/PPR.xml").read_text()
)
test_cal_metadata = AbstractTextFile(
filename="test/test.json", content=Path("test/test.json").read_text()
filename="test/input_files/test.json", content=Path("test/input_files/test.json").read_text()
)
img_settings = {
......@@ -32,15 +34,17 @@ img_settings = {
"homeForReprocessing": "/home/casa/packages/pipeline/current",
"rootDirectory": "/tmp/workspaces_tmp/",
"processingDirectory": "tmpiox5trbp",
"metadata": "test/image-metadata.json",
"ppr": "test/cmsimage-PPR.xml",
"metadata": "test/input_files/image-metadata.json",
"ppr": "test/input_files/cmsimage-PPR.xml",
"product_type": "standard-img",
}
test_img_ppr = AbstractTextFile(
filename="test/cmsimage-PPR.xml", content=Path("test/cmsimage-PPR.xml").read_text()
filename="test/input_files/cmsimage-PPR.xml",
content=Path("test/input_files/cmsimage-PPR.xml").read_text(),
)
test_img_metadata = AbstractTextFile(
filename="test/image-metadata.json", content=Path("test/image-metadata.json").read_text()
filename="test/input_files/image-metadata.json",
content=Path("test/input_files/image-metadata.json").read_text(),
)
......@@ -138,7 +142,7 @@ class TestAuditFiles:
ppr = AuditFiles([test_cal_metadata, test_ppr], cal_settings).correct_for_condor(
ppr=test_ppr
)
assert ppr.filename == "test/PPR.xml"
assert ppr.filename == "test/input_files/PPR.xml"
def test_audit(self):
result = AuditFiles([test_cal_metadata, test_ppr], cal_settings).audit()
......@@ -147,5 +151,5 @@ class TestAuditFiles:
class TestAuditDirectories:
def test_audit(self):
result = AuditDirectories(test_ppr, cal_settings).audit()
result = AuditDirectories(test_ppr, cal_settings).audit
assert result is False
......@@ -12,8 +12,8 @@ expected_settings = {
"rootDirectory": "/lustre/aoc/cluster/pipeline/docker/workspaces/spool",
"processingDirectory": "tmpo1ca1pp_",
"parent_path": "/lustre/aoc/cluster/pipeline/docker/workspaces/spool/tmpo1ca1pp_",
"metadata": "test/test.json",
"ppr": "test/PPR.xml",
"metadata": "test/input_files/test.json",
"ppr": "test/input_files/PPR.xml",
"product_type": "standard-cal",
}
args = argparse.Namespace()
......@@ -21,7 +21,8 @@ args = argparse.Namespace()
class TestPalaver:
def test_get_settings(self):
args.standard_cal = ["test/test.json", "test/PPR.xml"]
args.standard_cal = ["test/input_files/test.json", "test/input_files/PPR.xml"]
args.restore = False
with patch(
"pathlib.Path.cwd",
......@@ -40,7 +41,8 @@ class TestPalaver:
@patch("os.chdir")
@patch("os.getcwd")
def test_main_cal(self, mock_cwd, mock_chdir):
args.standard_cal = ["test/test.json", "test/PPR.xml"]
args.standard_cal = ["test/input_files/test.json", "test/input_files/PPR.xml"]
args.integrated = False
with patch("argparse.ArgumentParser.parse_args", MagicMock(return_value=args)) as mock_args:
with patch("casa_envoy.launchers.CalibrationLauncher.launch_casa") as cal_launcher:
......@@ -52,7 +54,10 @@ class TestPalaver:
@patch("os.chdir")
@patch("os.getcwd")
def test_main_img(self, mock_cwd, mock_chdir):
args.standard_img = ["test/image-metadata.json", "test/cmsimage-PPR.xml"]
args.standard_img = [
"test/input_files/image-metadata.json",
"test/input_files/cmsimage-PPR.xml",
]
with patch("argparse.ArgumentParser.parse_args", MagicMock(return_value=args)) as mock_args:
with patch("casa_envoy.launchers.ImagingLauncher.launch_casa") as img_launcher:
......
"""
Tests for casa_envoy.foundation
"""
from pathlib import Path
from unittest.mock import patch
from casa_envoy.schema import AbstractTextFile
from casa_envoy.foundation import RestoreFoundation
parameters = expected_settings = {
"useCasa": False,
"homeForReprocessing": "/home/casa/packages/pipeline/current",
"rootDirectory": "/lustre/aoc/cluster/pipeline/docker/workspaces/spool",
"processingDirectory": "tmpo1ca1pp_",
"parent_path": "/lustre/aoc/cluster/pipeline/docker/workspaces/spool/tmpo1ca1pp_",
"metadata": "test/input_files/restore.json",
"ppr": "test/input_files/PPR.xml",
"product_type": "restore",
}
test_restore_metadata = AbstractTextFile(
filename="test/input_files/restore.json",
content=Path("test/input_files/restore.json").read_text(),
)
foundation = RestoreFoundation(parameters=parameters, metadata=test_restore_metadata)
class TestRestoreFoundation:
@patch("os.chdir")
def test_data_foundation(self, mock_chdir):
with patch("casa_envoy.foundation.RestoreFoundation.extract_cal") as extract:
with patch("casa_envoy.foundation.RestoreFoundation.set_permissions") as permissions:
foundation.data_foundation()
assert mock_chdir.call_count == 1
@patch("pathlib.Path.exists", return_value=True)
@patch("tarfile.open")
def test_extract_cal(self, mock_tar, mock_path):
foundation.extract_cal()
assert mock_tar.call_count == 1
@patch("os.path.join")
@patch("os.chmod")
@patch("os.walk")
def test_set_permissions(self, mock_walk, mock_chmod, mock_join):
foundation.set_permissions()
assert mock_walk.call_count == 1
......@@ -8,8 +8,8 @@ cal_parameters = {
"homeForReprocessing": "/home/casa/packages/pipeline/current",
"rootDirectory": "/tmp/workspaces_tmp/",
"processingDirectory": "tmpiox5trbp",
"metadata": "test/test.json",
"ppr": "test/PPR.xml",
"metadata": "test/input_files/test.json",
"ppr": "test/input_files/PPR.xml",
"product_type": "standard-cal",
}
img_parameters = {
......@@ -17,18 +17,28 @@ img_parameters = {
"homeForReprocessing": "/home/casa/packages/pipeline/current",
"rootDirectory": "/tmp/workspaces_tmp/",
"processingDirectory": "tmpiox5trbp",
"metadata": "test/image-metadata.json",
"ppr": "test/cmsimage-PPR.xml",
"metadata": "test/input_files/image-metadata.json",
"ppr": "test/input_files/cmsimage-PPR.xml",
"product_type": "standard-img",
}
restore_parameters = {
"useCasa": False,
"homeForReprocessing": "/home/casa/packages/pipeline/current",
"rootDirectory": "/tmp/workspaces_tmp/",
"processingDirectory": "tmpiox5trbp",
"metadata": "test/input_files/restore.json",
"ppr": "test/input_files/PPR.xml",
"product_type": "restore",
}
class TestCasaLauncher:
def test_setup_environment(self):
CasaLauncher(parameters=cal_parameters).setup_environment()
assert os.environ.get("SCIPIPE_ROOTDIR") == cal_parameters["rootDirectory"]
assert os.environ.get("CASA_HOME") == cal_parameters["homeForReprocessing"]
assert os.environ.get("PPR_FILENAME") == "test/PPR.xml"
assert os.environ.get("PPR_FILENAME") == "test/input_files/PPR.xml"
@patch("subprocess.run")
@patch("os.chdir")
......@@ -72,3 +82,8 @@ class TestImagingLauncher:
def test_check_imageable(self):
check = ImagingLauncher(parameters=img_parameters).check_imageable()
assert check is True
def test_check_cal_and_imageable(self):
restore_parameters["product_type"] = "integrated"
check = ImagingLauncher(parameters=restore_parameters).check_cal_and_imageable()
assert check is True
......@@ -165,9 +165,21 @@ class NgasFile(LocatedFile):
def __str__(self):
return f"NGAS {self.ngas_file_id} from {self.server} -> {self.subdirectory}/{self.relative_path}"
def __eq__(self, other):
return (
self.ngas_file_id == other.ngas_file_id
and self.subdirectory == other.subdirectory
and self.relative_path == other.relative_path
and self.checksum == other.checksum
and self.checksum_type == other.checksum_type
and self.version == other.version
and self.size == other.size
and self.server == other.server
)
class OracleXml(LocatedFile):
""" Represents the metadata of an ALMA SDM stored as XML in the ALMA DB """
"""Represents the metadata of an ALMA SDM stored as XML in the ALMA DB"""
archive_uid: str
table: str
......@@ -195,7 +207,7 @@ class OracleXml(LocatedFile):
class FileLocator(Locator):
""" Loads a locations report from a .json report file """
"""Loads a locations report from a .json report file"""
def __init__(self, file: Path):
self.file = file
......@@ -205,7 +217,7 @@ class FileLocator(Locator):
class ServiceLocator(Locator):
""" Acquires a locations report from the archiveService """
"""Acquires a locations report from the archiveService"""
def __init__(self, science_product_locator: str):
self.spl = science_product_locator
......@@ -216,9 +228,7 @@ class ServiceLocator(Locator):
# this is needed to prevent SSL errors when tests are run
# inside a Docker container
requests.packages.urllib3.util.ssl_.DEFAULT_CIPHERS += ":HIGH:!DH:!aNULL"
requests.Session().mount(
settings.locatorServiceUrlPrefix, adapter=requests.adapters.HTTPAdapter()
)
requests.Session().mount(settings.locatorServiceUrlPrefix, adapter=requests.adapters.HTTPAdapter())
try:
response = requests.get(settings.locatorServiceUrlPrefix, params={"locator": self.spl})
......@@ -237,7 +247,7 @@ class ServiceLocator(Locator):
class NgasServerSchema(Schema):
""" marshmallow schema to interpret "server" section of a "files" item """
"""marshmallow schema to interpret "server" section of a "files" item"""
server = fields.Str()
location = fields.Str()
......@@ -249,7 +259,7 @@ class NgasServerSchema(Schema):
class NgasFileSchema(Schema):
""" One of the items in a location report's "files" list """
"""One of the items in a location report's "files" list"""
ngas_file_id = fields.Str()
subdirectory = fields.Str(allow_none=True)
......@@ -276,7 +286,7 @@ class NgasFileSchema(Schema):
class LocationReportSchema(Schema):
""" Encapsulates an entire locations report """
"""Encapsulates an entire locations report"""
files = fields.List(fields.Nested(NgasFileSchema()))
aggregate_size = fields.Integer()
......
""" Command-line parsing conveniences """
import argparse
import itertools
import sys
from enum import Enum
from pathlib import Path
from typing import List
# pylint: disable=E0401, E0402, R0913
from .exceptions import *
from .exceptions import FetchError
from .fetcher_factory import ConfiguredFetcherFactory
from .fetchers import ForceMode
from .interfaces import FetchPlan, FetcherFactory, FileFetcher
......@@ -23,7 +24,7 @@ class RetrievalMode(Enum):
class CLIParam(Enum):
""" Codifies productfetcher's various command-line parameters """
"""Codifies productfetcher's various command-line parameters"""
SPL = "--product-locator"
FILE = "--location-file"
......@@ -41,18 +42,18 @@ providing the path to a product locator report."""
class FetchContext:
""" Handles the various command-line options """
"""Handles the various command-line options"""
def __init__(
self,
locator: Locator,
locators: List[Locator],
streaming: bool,
direct_copy: bool,
dry_run=False,
force=False,
concurrency=1,
):
self.locator = locator
self.locators = locators
self.streaming = streaming
self.direct_copy = direct_copy
self.dry_run = dry_run
......@@ -66,7 +67,7 @@ class FetchContext:
:return: a plan
"""
# Get the location report
report = self.locator.locate()
reports = [locator.locate() for locator in self.locators]
# Using the arguments we've parsed, generate the fetcher factory
factory = ConfiguredFetcherFactory(
......@@ -75,11 +76,12 @@ class FetchContext:
direct_copy=self.direct_copy,
dry_run=self.dry_run,
force=self.force,
aggregate_size=report.aggregate_size,
aggregate_size=sum([report.aggregate_size for report in reports]),
)
# First we must prepare the fetchers for the location report
all_fetchers = report.fetchers(factory)
# this itertools.chain is a recipe from the itertools documentation for flattening a mapping
all_fetchers = list(itertools.chain.from_iterable(report.fetchers(factory) for report in reports))
# now we have enough stuff to proceed with generating the plan
return self.calculate_plan(factory, all_fetchers)
......@@ -129,17 +131,22 @@ class FetchContext:
:param args:
:return:
"""
namespace = FetchContext.arg_parser().parse_args(args)
parser = FetchContext.arg_parser()
namespace = parser.parse_args(args)
# determine the locator
locator = (
FileLocator(Path(namespace.location_file))
if namespace.location_file
else ServiceLocator(namespace.product_locator)
)
# determine the locators
location_files: List[Locator] = [FileLocator(Path(file)) for file in namespace.location_files]
service_locators: List[Locator] = [ServiceLocator(spl) for spl in namespace.product_locators]
locators = location_files + service_locators
# we can no longer rely on the argument parser to enforce this constraint, so we have to do it
# ourselves here
if len(locators) == 0:
parser.error("You must supply at least one science product locator or location file")
return FetchContext(
locator,
locators,
namespace.streaming,
namespace.direct_copy,
namespace.dry_run,
......@@ -181,19 +188,15 @@ class FetchContext:
default=16,
)
exclusive_group = parser.add_mutually_exclusive_group(required=True)
exclusive_group.add_argument(
CLIParam.SPL.value,
action="store",
dest="product_locator",
help="product locator to download",
parser.add_argument(
CLIParam.SPL.value, action="append", dest="product_locators", help="product locator to download", default=[]
)
exclusive_group.add_argument(
parser.add_argument(
CLIParam.FILE.value,
action="store",
dest="location_file",
action="append",
dest="location_files",
help="product locator report (in JSON)",
default=[],
)
dry_or_force_options = parser.add_mutually_exclusive_group(required=False)
......
......@@ -39,9 +39,7 @@ class FakeFactory(FetcherFactory):
def fetch_oracle_xml(self, file: LocatedFile) -> FileFetcher:
return DryRunFakeFileFetcher(file)
def fetch_plan(
self, fetchers: List[Union[FileFetcher, FetchPlan]], concurrency: int = 1
) -> FetchPlan:
def fetch_plan(self, fetchers: List[Union[FileFetcher, FetchPlan]], concurrency: int = 1) -> FetchPlan:
return FakePlan(fetchers, concurrency)
......@@ -66,6 +64,36 @@ def test_plan_generation(resource_path_root):
assert sum(len(subplan.fetchers) for subplan in plan.fetchers) == len(report.files)
def test_multiple_locator_fetching(capsys, resource_path_root):
"""
Can we handle multiple locators?
:param capsys:
:return:
"""
img = resource_path_root / "location_files" / "IMG.json"
cal = resource_path_root / "location_files" / "CALIBRATION.json"
# parse the command line with these two
fc = FetchContext.parse_commandline([CLIParam.FILE.value, str(img), CLIParam.FILE.value, str(cal)])
assert len(fc.locators) == 2
# let's make the plan and ensure we have all the stuff we expect from both
plan = fc.generate_plan()
# we'll need to open these files ourselves to figure out what fetchers we expect
for locator_file in [img, cal]:
for file in FileLocator(locator_file).locate().files:
seen = False
# there may be a more "test friendly" way of doing this, such as by asking the plan
# if it is fetching a certain file, but it seems like a lot of refactoring for this
# one test, so I'm going to leave it alone for now
for hostgroup_fetcher in plan.fetchers:
seen = seen or file in [fetcher.file for fetcher in hostgroup_fetcher.fetchers]
assert seen
def test_argument_parsing(capsys):
"""
Can we parse the command-line arguments passed in?
......@@ -78,12 +106,6 @@ def test_argument_parsing(capsys):
# must have an SPL or a file
FetchContext.parse_commandline([])
with pytest.raises(SystemExit):
# cannot have an SPL and a file
FetchContext.parse_commandline(
[CLIParam.FILE.value, "foo", CLIParam.SPL.value, "uid://this/is/a/fakesy"]
)
# check the dry run value
fc = FetchContext.parse_commandline([CLIParam.DRY.value, CLIParam.FILE.value, "foo"])
assert fc.dry_run
......@@ -97,16 +119,14 @@ def test_argument_parsing(capsys):
assert fc.force == ForceMode.NORMAL
fc = FetchContext.parse_commandline([CLIParam.FILE.value, "foo"])
assert isinstance(fc.locator, FileLocator)
assert fc.locator.file == Path("foo")
assert isinstance(fc.locators[0], FileLocator)
assert fc.locators[0].file == Path("foo")
fc = FetchContext.parse_commandline([CLIParam.SPL.value, "uid://this/is/a/fakesy"])
assert isinstance(fc.locator, ServiceLocator)
assert fc.locator.spl == "uid://this/is/a/fakesy"
assert isinstance(fc.locators[0], ServiceLocator)
assert fc.locators[0].spl == "uid://this/is/a/fakesy"
fc = FetchContext.parse_commandline(
[CLIParam.FILE.value, "foo", CLIParam.CONCURRENCY.value, "732"]
)
fc = FetchContext.parse_commandline([CLIParam.FILE.value, "foo", CLIParam.CONCURRENCY.value, "732"])
assert fc.concurrency == 732
capsys.readouterr()
"""
Extracts workflow relevant metadata from the NRAO archive based on Science Product Locator.
"""
import argparse
import json
import logging
import sys
import pendulum
import psycopg2 as pg
from aat_wrest.observation_wrester import ObservationWrester
from aat_wrest.utilities import PENDULUM_FORMAT, TIME_ZONE, MDDBConnector
from pycapo import CapoConfig
from typing import List
logger = logging.getLogger("aat_wrest")
logger.setLevel(logging.INFO)
logger.addHandler(logging.StreamHandler(sys.stdout))
from aat_wrest.utilities import PENDULUM_FORMAT, TIME_ZONE, MDDBConnector
class WrestWorkflowMetadata:
......@@ -23,18 +16,15 @@ class WrestWorkflowMetadata:
"""
def __init__(
self, connection: MDDBConnector, spl: str = None, fileset_id: str = None, sdm_id: str = None
self,
connection: MDDBConnector,
spl: List[str] = None,
sdm_id: str = None,
):
self.logger = logging.getLogger("aat_wrest")
self.conn = connection
if fileset_id is not None:
self.fileset_id = fileset_id
if sdm_id is not None:
self.sdm_id = sdm_id
if not spl and fileset_id:
self.spl = self.wrest_obs_metadata_from_fileset_id(fileset_id)["spl"]
else:
self.spl = spl
self.sdm_id = sdm_id
self.spl = spl
def wrest_standard_cal_info(self) -> json:
"""
......@@ -85,8 +75,8 @@ class WrestWorkflowMetadata:
def wrest_standard_image_info(self) -> json:
"""
Given an execution block science product locator, returns the required metadata to run
the standard calibration workflow
Given an execution block SDM ID, returns the required metadata to run
the standard imaging workflow
:return:
"""
......@@ -130,82 +120,57 @@ class WrestWorkflowMetadata:
self.conn.close()
return make_json
def wrest_obs_metadata_from_fileset_id(self, fileset_id: str) -> str:
def wrest_restore_info(self) -> json:
"""
Given a fileset_id, query the Metadata DB and return the corresponding science_product_locator
:param fileset_id:
:return science_product_locator:
Given an execution block science product locator and calibration science product locator, returns
the required metadata to run the restore CMS workflow
:return:
"""
metadata = {
"spl": None,
"bands": None,
"array_config": None,
"obs_start_time": None,
"obs_end_time": None,
}
sql = f"""
SELECT science_product_locator, band_code, configuration, starttime, endtime
FROM execution_blocks
WHERE ngas_fileset_id = %(fileset_id)s
eb_sql = f"""
SELECT ngas_fileset_id as filesetId,
e.project_code as projectCode,
p.title as title,
e.starttime as startTime,
(a.firstname || ' ' || a.lastname) as observer,
telescope as telescope
FROM execution_blocks e
JOIN projects p on e.project_code = p.project_code
JOIN authors a on p.project_code = a.project_code
WHERE science_product_locator = %(spl)s AND a.is_pi = true
"""
with self.conn.cursor() as cursor:
cursor.execute(sql, {"fileset_id": fileset_id})
data = cursor.fetchall()
metadata["spl"] = data[0][0]
metadata["bands"] = data[0][1]
metadata["array_config"] = data[0][2]
metadata["obs_start_time"] = data[0][3]
metadata["obs_end_time"] = data[0][4]
return metadata
def parser() -> argparse.ArgumentParser:
arg_parser = argparse.ArgumentParser(
description="Workspaces-to-Archive Metadata Exchange",
formatter_class=argparse.RawTextHelpFormatter,
)
arg_parser.add_argument(
"-sc",
"--stdcals",
nargs=1,
action="store",
required=False,
help="Find workflow metadata for standard calibrations with provided product locator",
)
arg_parser.add_argument(
"-si",
"--stdimg",
nargs=1,
action="store",
required=False,
help="Find workflow metadata for standard CMS imaging with provided SDM id",
)
arg_parser.add_argument(
"-obs",
"--observation",
nargs=1,
action="store",
required=False,
help="Find display metadata for observations by product locator",
)
return arg_parser
def determine_wrester(connection: MDDBConnector, args: argparse.Namespace):
if args.stdcals:
data = WrestWorkflowMetadata(connection, spl=args.stdcals[0]).wrest_standard_cal_info()
if args.stdimg:
data = WrestWorkflowMetadata(connection, sdm_id=args.stdimg[0]).wrest_standard_image_info()
if args.observation:
data = ObservationWrester(connection, spl=args.observation[0]).wrest_observation_info()
print(data)
def main():
args = parser().parse_args()
connection = MDDBConnector()
cal_sql = f"""
SELECT external_name as calSdmId
from science_products WHERE science_product_locator = %(cal_spl)s
"""
determine_wrester(connection, args)
make_json = {}
try:
cursor = self.conn.cursor()
cursor.execute(eb_sql, {"spl": self.spl[0]})
data = cursor.fetchall()
cursor.execute(cal_sql, {"cal_spl": self.spl[1]})
cal_data = cursor.fetchall()
if data and cal_data:
make_json = json.dumps(
{
"sdmId": data[0][0],
"calSdmId": cal_data[0][0],
"projectCode": data[0][1],
"title": data[0][2],
"startTime": data[0][3],
"observer": data[0][4],
"telescope": data[0][5],
"created_at": str(
pendulum.now().in_timezone(TIME_ZONE).format(PENDULUM_FORMAT)
),
}
)
else:
self.logger.error(
f"ERROR: aat-wrest query returned no results!"
f" The database appears to be missing information for either SPL {self.spl[0]} or Cal SPL {self.spl[1]}!"
)
finally:
self.conn.close()
return make_json
......@@ -11,17 +11,60 @@ import pendulum
from aat_wrest.utilities import MDDBConnector, TIME_ZONE, format_interval
class ObservationWrester:
class WrestObservationMetadata:
"""
Class for extracting observation metadata
"""
def __init__(self, connection: MDDBConnector, spl: str):
def __init__(self, connection: MDDBConnector, sdm_id: str):
self.logger = logging.getLogger("aat_wrest")
self.conn = connection
self.spl = spl
self.sdm_id = sdm_id
def wrest_observation_info(self) -> json:
"""
Given a sdm_id, query the Metadata DB for observation data related to an ingestion-complete event
:param sdm_id:
:return science_product_locator:
"""
metadata = {}
sql = f"""
SELECT science_product_locator,
band_code,
configuration,
starttime,
endtime
FROM execution_blocks
WHERE ngas_fileset_id = %(sdm_id)s
"""
try:
cursor = self.conn.cursor()
cursor.execute(sql, {"sdm_id": self.sdm_id})
data = cursor.fetchall()
if data:
metadata = json.dumps(
{
"spl": data[0][0],
"bands": data[0][1],
"array_config": data[0][2],
"obs_start_time": data[0][3],
"obs_end_time": data[0][4],
}
)
else:
self.logger.error(
f"ERROR: aat-wrest query returned no results!"
f" The database appears to be missing information for SDM id {self.sdm_id}"
)
finally:
self.conn.close()
return metadata
#
# currently unused. Was intended for analyst list display of waiting capability requests/executions
#
def wrest_observation_time_info(self) -> json:
"""
Given a product locator, reports the project code, observation length,
and time in queue for an incomplete observation.
......