Skip to content
Snippets Groups Projects
Commit 99f1619c authored by Charlotte Hausman's avatar Charlotte Hausman
Browse files

restore capability and workflow

parent 6165066b
No related branches found
No related tags found
1 merge request!340restore capability and workflow
Pipeline #2194 passed
Showing
with 598 additions and 34 deletions
# CASA Envoy: The Workspaces CASA Processing System
CASA Envoy is responsible for environment setup and launch of all types of CASA processing for the Workspaces System.
Currently, this includes Standard Calibration, CMS Restores, Standard CMS Imaging, and CMS Restore-and-Image.
```
usage: casa_envoy [-h] [-c STANDARD_CAL STANDARD_CAL] [-i STANDARD_IMG STANDARD_IMG] [--restore] [--integrated]
Workspaces CASA processing launcher
optional arguments:
-h, --help show this help message and exit
-c, --standard-cal run the standard calibration CASA pipeline
-i, --standard-img run the standard imaging CASA pipeline
--restore run the restore measurement set CASA pipeline, use in conjunction with '-c'
--integrated run an integrated calibration-imaging pipeline, use in conjunction with '-i'
```
While initiating CASA is relatively straight forward and essentially identical between use cases, there are variations in
the required PPR structure and the initial location of input data. For this reason CASA Envoy's functionality
can be broken into two underlying parts: Setup and Launch.
## Setup
Setup can be further divided into three essential parts: Auditing, Environment, and Data
### Auditing
There are two types of auditing that occur in CASA Envoy. The first, Directory Auditing, ensures that the required
directory structure of *rawdata*, *working*, and *products* is contained in the processing directory.
The second type is File Auditing. CASA Envoy must be provided to different files at inital call, *metadata.json* and *PPR.xml*. These files must be
audited to ensure that all required fields are present. *PPR.xml* must be submitted directly to CASA so it is important
to make sure that it is sstructured correctly before submission. While *metadata.json* is not submitted to CASA, it does
contain all the information for results delivery post CASA and needs to be audited as well.
A further function of the File Auditing is correcting *PPR.xml* for processing with HTCondor. CASA requires the name of
the processing directory it is running in - unfortunately, with HTCondor it isn't possible to know that name prior to
submitting a condor job. Therefore, *PPR.xml* is corrected after submission to condor with the correct directory
information. This corrected *PPR.xml* is placed into the *working* directory where it is then used by CASA, and the
unaltered original remains in the parent processing directory.
### Environment
CASA has several environment variable that are required to be set by whatever system it is running on.
These include:
| ENV Variables | Description |
|--- |--- |
| SCIPIPE_ROOT | location of processing directory |
| CASA_HOME | location of CASA version to use for processing |
| PPR_FILENAME | name of PPR file (Should always be PPR.xml, but sometimes might not) |
| LANG | Turns out condor doesn't set this right for CASA happiness so we're fixing that here |
### Data
Usually the starting location for data is *rawdata*. This is the location where data is always placed by the
Workspaces Product Fetcher. For Standard Calibrations and Standard CMS Imaging this is correct. However, for
calibration restores, the calibration tar file needs to be extracted into the *products* directory. This step ensures
that all input data is in the correct location pre-processing for CASA to find when required.
## Launch
Since the only difference between calibration and imaging processing is the contents of *PPR.xml*, CASA Envoy contains
a single CASA Launcher class which is utilized by the two type launcher classes: Calibration Launcher and Imaging
Launcher. Each type launcher handles both standard and restore or integration types of processing. The two type classes ensure that setup is correct for each product type, as described above, and then calls
the CASA Launcher.
<br/>
<br/>
Post CASA processing, the casa log is checked for error flags and CASA envoy exits.
...@@ -34,6 +34,10 @@ def get_fields_for(product_type: str, filename: str) -> list: ...@@ -34,6 +34,10 @@ def get_fields_for(product_type: str, filename: str) -> list:
"sdmId", "sdmId",
] ]
restore_metadata_list = [
"calProductLocator",
]
ppr_list = ["RootDirectory", "RelativePath", "SdmIdentifier"] ppr_list = ["RootDirectory", "RelativePath", "SdmIdentifier"]
if ".xml" in filename: if ".xml" in filename:
...@@ -43,6 +47,8 @@ def get_fields_for(product_type: str, filename: str) -> list: ...@@ -43,6 +47,8 @@ def get_fields_for(product_type: str, filename: str) -> list:
return cal_metadata_list return cal_metadata_list
elif ".json" in filename and "img" in product_type: elif ".json" in filename and "img" in product_type:
return img_metadata_list return img_metadata_list
elif ".json" in filename and product_type == "restore":
return cal_metadata_list + restore_metadata_list
def get_xml_content(file: AbstractTextFile): def get_xml_content(file: AbstractTextFile):
...@@ -126,10 +132,12 @@ class AuditFiles(AuditorIF): ...@@ -126,10 +132,12 @@ class AuditFiles(AuditorIF):
class AuditDirectories(AuditorIF): class AuditDirectories(AuditorIF):
def __init__(self, ppr: AbstractTextFile, settings: Dict[str, str]): def __init__(self, ppr: AbstractTextFile, settings: Dict[str, str]):
self.logger = logging.getLogger("casa_envoy") self.logger = logging.getLogger("casa_envoy")
self.parameters = settings
self.rootDirectory = settings["rootDirectory"] self.rootDirectory = settings["rootDirectory"]
self.relative_path = settings["processingDirectory"] self.relative_path = settings["processingDirectory"]
self.sdmId = get_value_for(ppr, "SdmIdentifier") self.sdmId = get_value_for(ppr, "SdmIdentifier")
@property
def audit(self) -> bool: def audit(self) -> bool:
current = os.getcwd() current = os.getcwd()
needed = self.rootDirectory + "/" + self.relative_path needed = self.rootDirectory + "/" + self.relative_path
...@@ -146,11 +154,21 @@ class AuditDirectories(AuditorIF): ...@@ -146,11 +154,21 @@ class AuditDirectories(AuditorIF):
data = os.listdir(Path(current + "/rawdata/")) data = os.listdir(Path(current + "/rawdata/"))
if len(data) > 0: if len(data) > 0:
self.logger.info("Data is available. Proceeding...") self.logger.info("Data is available. Proceeding...")
if self.parameters["product_type"] is "restore":
self.logger.info("Checking products/ for calibration tables...")
cal_data = os.listdir(Path(current + "/products/"))
if len(cal_data) > 0:
self.logger.info("Calibration data is available. Proceeding...")
else:
self.logger.error("FAILURE: calibration data not found in products/")
return False
return True return True
else: else:
self.logger.info("FAILURE: data not found in rawdata/") self.logger.error("FAILURE: data not found in rawdata/")
return False return False
else: else:
self.logger.info( self.logger.error(
"DIRECTORY ERROR: A directory is missing from the processing root directory." "DIRECTORY ERROR: A directory is missing from the processing root directory."
) )
"""
Classes and methods for laying the data location foundation for various types of CASA processing
"""
import logging
import os
import tarfile
from pathlib import Path
import json
from casa_envoy.schema import AbstractTextFile
from casa_envoy.interfaces import FoundationIF
class RestoreFoundation(FoundationIF):
def __init__(self, parameters: dict, metadata: AbstractTextFile):
self.logger = logging.getLogger("casa_envoy")
self.parameters = parameters
self.parent_path = parameters["parent_path"]
self.metadata = metadata
def data_foundation(self):
"""
CMS Restore requires two inputs: An EB and a Calibration for that EB
After download, all data is in rawdata and the calibrations tables are contained
in a tar file. We need to extract all the calibration tables to the products directory
for CASA processing.
:return:
"""
self.logger.info("LAYING DATA FOUNDATION...")
# ensure we're starting from the parent directory
os.chdir(self.parent_path)
self.extract_cal()
self.set_permissions()
self.logger.info("DATA FOUNDATION COMPLETE!")
def extract_cal(self):
self.logger.info("Extracting calibration tar file to products directory...")
cal_name = json.loads(self.metadata.content)["fileSetIds"][1]
cal_path = "./rawdata" + cal_name
if Path(cal_path).exists():
calibration = tarfile.open(cal_path)
# extract to products
calibration.extractall(path="./products")
calibration.close()
else:
self.logger.error(f"ERROR: calibration tar file {cal_name} not found in rawdata!")
def set_permissions(self):
self.logger.info("Ensuring correct file permissions....")
path = Path("./products")
for root, dirs, files in os.walk(path):
for d in dirs:
os.chmod(os.path.join(root, d), 0o755)
for f in files:
os.chmod(os.path.join(root, f), 0o755)
...@@ -26,5 +26,18 @@ class AuditorIF(ABC): ...@@ -26,5 +26,18 @@ class AuditorIF(ABC):
Generic functionality implementation for auditor classes Generic functionality implementation for auditor classes
""" """
@abc.abstractmethod
def audit(self): def audit(self):
raise NotImplementedError pass
class FoundationIF(ABC):
"""
Generic Foundation methods
Should be implemented for any type of CASA processing that requires initial data placement
in locations other than rawdata after download
"""
@abc.abstractmethod
def data_foundation(self):
pass
...@@ -9,6 +9,7 @@ from typing import Dict ...@@ -9,6 +9,7 @@ from typing import Dict
import json import json
from casa_envoy.auditor import AuditFiles, AuditDirectories from casa_envoy.auditor import AuditFiles, AuditDirectories
from casa_envoy.foundation import RestoreFoundation
from casa_envoy.interfaces import LauncherIF from casa_envoy.interfaces import LauncherIF
from casa_envoy.schema import AbstractTextFile from casa_envoy.schema import AbstractTextFile
...@@ -102,23 +103,49 @@ class CalibrationLauncher(LauncherIF): ...@@ -102,23 +103,49 @@ class CalibrationLauncher(LauncherIF):
self.metadata = get_abs_file(parameters.get("metadata")) self.metadata = get_abs_file(parameters.get("metadata"))
def launch_casa(self): def launch_casa(self):
if self.check_calibratable(): if self.parameters["product_type"] == "restore":
check_input = self.check_restorable()
else:
check_input = self.check_calibratable()
if check_input:
self.prepare_for_casa()
self.run_audit(self.parameters) self.run_audit(self.parameters)
CasaLauncher(self.parameters).run() CasaLauncher(self.parameters).run()
else: else:
self.logger.error("ERROR: Provided SPL is not type execution block!") self.logger.error("ERROR: Provided SPL(s) are not correct type(s) for processing!")
sys.exit(1) sys.exit(1)
def prepare_for_casa(self):
# Ensure all data is in the required locations for CASA processing (This is not always rawdata!)
if self.parameters["product_type"] == "restore":
RestoreFoundation(self.parameters, self.metadata).data_foundation()
return
def check_calibratable(self) -> bool: def check_calibratable(self) -> bool:
spl = self.metadata.content["productLocator"] spl = self.metadata.content["productLocator"]
if "execblock" in spl: if "execblock" in spl:
return True return True
else: else:
self.logger.info("SPL ERROR: This product locator is not calibratable!") self.logger.error(
"SPL ERROR: This product locator is not calibratable! Please check your inputs"
)
return False return False
def check_restorable(self) -> bool:
spl = self.metadata.content["productLocator"]
cal_spl = self.metadata.content["calProductLocator"]
if "execblock" in spl and "calibration" in cal_spl:
return True
else:
self.logger.error(
"SPL ERROR: This set of product locators are not restorable! Please check your inputs"
)
def run_audit(self, parameters: Dict[str, str]): def run_audit(self, parameters: Dict[str, str]):
dir_audit = AuditDirectories(self.ppr, parameters).audit() dir_audit = AuditDirectories(self.ppr, parameters).audit
if dir_audit: if dir_audit:
self.logger.info("Directory audit successful!") self.logger.info("Directory audit successful!")
else: else:
...@@ -159,7 +186,7 @@ class ImagingLauncher(LauncherIF): ...@@ -159,7 +186,7 @@ class ImagingLauncher(LauncherIF):
return False return False
def run_audit(self, parameters: Dict[str, str]): def run_audit(self, parameters: Dict[str, str]):
dir_audit = AuditDirectories(self.ppr, parameters).audit() dir_audit = AuditDirectories(self.ppr, parameters).audit
if dir_audit: if dir_audit:
self.logger.info("Directory audit successful!") self.logger.info("Directory audit successful!")
else: else:
......
...@@ -50,6 +50,7 @@ def arg_parser() -> argparse.ArgumentParser: ...@@ -50,6 +50,7 @@ def arg_parser() -> argparse.ArgumentParser:
formatter_class=argparse.RawTextHelpFormatter, formatter_class=argparse.RawTextHelpFormatter,
) )
parser.add_argument( parser.add_argument(
"-c",
"--standard-cal", "--standard-cal",
nargs=2, nargs=2,
action="store", action="store",
...@@ -57,11 +58,24 @@ def arg_parser() -> argparse.ArgumentParser: ...@@ -57,11 +58,24 @@ def arg_parser() -> argparse.ArgumentParser:
help="run the standard calibration CASA pipeline", help="run the standard calibration CASA pipeline",
) )
parser.add_argument( parser.add_argument(
"-i",
"--standard-img", "--standard-img",
nargs=2, nargs=2,
action="store", action="store",
required=False, required=False,
help="run the standard calibration CASA pipeline", help="run the standard imaging CASA pipeline",
)
parser.add_argument(
"--restore",
required=False,
action="store_true",
help="run the restore measurement set CASA pipeline, use in conjunction with '-c'",
)
parser.add_argument(
"--integrated",
required=False,
action="store_true",
help="run an integrated calibration-imaging pipeline, use in conjunction with '-i'",
) )
return parser return parser
...@@ -73,7 +87,10 @@ def main(): ...@@ -73,7 +87,10 @@ def main():
if args.standard_cal is not None: if args.standard_cal is not None:
parameters = _get_settings(pathlib.Path(path), args.standard_cal) parameters = _get_settings(pathlib.Path(path), args.standard_cal)
parameters["product_type"] = "standard-cal" if args.restore:
parameters["product_type"] = "restore"
else:
parameters["product_type"] = "standard-cal"
CalibrationLauncher(parameters).launch_casa() CalibrationLauncher(parameters).launch_casa()
# make sure we return to the parent directory after processing # make sure we return to the parent directory after processing
...@@ -81,7 +98,10 @@ def main(): ...@@ -81,7 +98,10 @@ def main():
elif args.standard_img is not None: elif args.standard_img is not None:
parameters = _get_settings(pathlib.Path(path), args.standard_img) parameters = _get_settings(pathlib.Path(path), args.standard_img)
parameters["product_type"] = "standard-img" if args.integrated:
parameters["product_type"] = "integrated"
else:
parameters["product_type"] = "standard-img"
ImagingLauncher(parameters).launch_casa() ImagingLauncher(parameters).launch_casa()
# return to parent directory after processing # return to parent directory after processing
......
{
"fileSetIds": ["brain_000.58099.67095825232", "calibration.tar"],
"workflowName": "restore_cms",
"systemId": "2",
"productLocator": "uid://evla/execblock/ec082e65-452d-4fec-ad88-f5b4af1f9e36",
"calProductLocator": "uid://evla/calibration/ec082e65-452d-4fec-ad88-f5b4af1f9e36",
"projectMetadata": {
"projectCode": "Operations",
"title": "",
"startTime": "58099.6710792824",
"observer": "VLA Operations"
},
"destinationDirectory": "/lustre/aoc/cluster/pipeline/docker/workspaces/spool/tmpabcd1234"
}
...@@ -18,13 +18,15 @@ cal_settings = { ...@@ -18,13 +18,15 @@ cal_settings = {
"homeForReprocessing": "/home/casa/packages/pipeline/current", "homeForReprocessing": "/home/casa/packages/pipeline/current",
"rootDirectory": "/tmp/workspaces_tmp/", "rootDirectory": "/tmp/workspaces_tmp/",
"processingDirectory": "tmpiox5trbp", "processingDirectory": "tmpiox5trbp",
"metadata": "test/test.json", "metadata": "test/input_files/test.json",
"ppr": "test/PPR.xml", "ppr": "test/input_files/PPR.xml",
"product_type": "standard-cal", "product_type": "standard-cal",
} }
test_ppr = AbstractTextFile(filename="test/PPR.xml", content=Path("test/PPR.xml").read_text()) test_ppr = AbstractTextFile(
filename="test/input_files/PPR.xml", content=Path("test/input_files/PPR.xml").read_text()
)
test_cal_metadata = AbstractTextFile( test_cal_metadata = AbstractTextFile(
filename="test/test.json", content=Path("test/test.json").read_text() filename="test/input_files/test.json", content=Path("test/input_files/test.json").read_text()
) )
img_settings = { img_settings = {
...@@ -32,15 +34,17 @@ img_settings = { ...@@ -32,15 +34,17 @@ img_settings = {
"homeForReprocessing": "/home/casa/packages/pipeline/current", "homeForReprocessing": "/home/casa/packages/pipeline/current",
"rootDirectory": "/tmp/workspaces_tmp/", "rootDirectory": "/tmp/workspaces_tmp/",
"processingDirectory": "tmpiox5trbp", "processingDirectory": "tmpiox5trbp",
"metadata": "test/image-metadata.json", "metadata": "test/input_files/image-metadata.json",
"ppr": "test/cmsimage-PPR.xml", "ppr": "test/input_files/cmsimage-PPR.xml",
"product_type": "standard-img", "product_type": "standard-img",
} }
test_img_ppr = AbstractTextFile( test_img_ppr = AbstractTextFile(
filename="test/cmsimage-PPR.xml", content=Path("test/cmsimage-PPR.xml").read_text() filename="test/input_files/cmsimage-PPR.xml",
content=Path("test/input_files/cmsimage-PPR.xml").read_text(),
) )
test_img_metadata = AbstractTextFile( test_img_metadata = AbstractTextFile(
filename="test/image-metadata.json", content=Path("test/image-metadata.json").read_text() filename="test/input_files/image-metadata.json",
content=Path("test/input_files/image-metadata.json").read_text(),
) )
...@@ -138,7 +142,7 @@ class TestAuditFiles: ...@@ -138,7 +142,7 @@ class TestAuditFiles:
ppr = AuditFiles([test_cal_metadata, test_ppr], cal_settings).correct_for_condor( ppr = AuditFiles([test_cal_metadata, test_ppr], cal_settings).correct_for_condor(
ppr=test_ppr ppr=test_ppr
) )
assert ppr.filename == "test/PPR.xml" assert ppr.filename == "test/input_files/PPR.xml"
def test_audit(self): def test_audit(self):
result = AuditFiles([test_cal_metadata, test_ppr], cal_settings).audit() result = AuditFiles([test_cal_metadata, test_ppr], cal_settings).audit()
...@@ -147,5 +151,5 @@ class TestAuditFiles: ...@@ -147,5 +151,5 @@ class TestAuditFiles:
class TestAuditDirectories: class TestAuditDirectories:
def test_audit(self): def test_audit(self):
result = AuditDirectories(test_ppr, cal_settings).audit() result = AuditDirectories(test_ppr, cal_settings).audit
assert result is False assert result is False
...@@ -12,8 +12,8 @@ expected_settings = { ...@@ -12,8 +12,8 @@ expected_settings = {
"rootDirectory": "/lustre/aoc/cluster/pipeline/docker/workspaces/spool", "rootDirectory": "/lustre/aoc/cluster/pipeline/docker/workspaces/spool",
"processingDirectory": "tmpo1ca1pp_", "processingDirectory": "tmpo1ca1pp_",
"parent_path": "/lustre/aoc/cluster/pipeline/docker/workspaces/spool/tmpo1ca1pp_", "parent_path": "/lustre/aoc/cluster/pipeline/docker/workspaces/spool/tmpo1ca1pp_",
"metadata": "test/test.json", "metadata": "test/input_files/test.json",
"ppr": "test/PPR.xml", "ppr": "test/input_files/PPR.xml",
"product_type": "standard-cal", "product_type": "standard-cal",
} }
args = argparse.Namespace() args = argparse.Namespace()
...@@ -21,7 +21,8 @@ args = argparse.Namespace() ...@@ -21,7 +21,8 @@ args = argparse.Namespace()
class TestPalaver: class TestPalaver:
def test_get_settings(self): def test_get_settings(self):
args.standard_cal = ["test/test.json", "test/PPR.xml"] args.standard_cal = ["test/input_files/test.json", "test/input_files/PPR.xml"]
args.restore = False
with patch( with patch(
"pathlib.Path.cwd", "pathlib.Path.cwd",
...@@ -40,7 +41,8 @@ class TestPalaver: ...@@ -40,7 +41,8 @@ class TestPalaver:
@patch("os.chdir") @patch("os.chdir")
@patch("os.getcwd") @patch("os.getcwd")
def test_main_cal(self, mock_cwd, mock_chdir): def test_main_cal(self, mock_cwd, mock_chdir):
args.standard_cal = ["test/test.json", "test/PPR.xml"] args.standard_cal = ["test/input_files/test.json", "test/input_files/PPR.xml"]
args.integrated = False
with patch("argparse.ArgumentParser.parse_args", MagicMock(return_value=args)) as mock_args: with patch("argparse.ArgumentParser.parse_args", MagicMock(return_value=args)) as mock_args:
with patch("casa_envoy.launchers.CalibrationLauncher.launch_casa") as cal_launcher: with patch("casa_envoy.launchers.CalibrationLauncher.launch_casa") as cal_launcher:
...@@ -52,7 +54,10 @@ class TestPalaver: ...@@ -52,7 +54,10 @@ class TestPalaver:
@patch("os.chdir") @patch("os.chdir")
@patch("os.getcwd") @patch("os.getcwd")
def test_main_img(self, mock_cwd, mock_chdir): def test_main_img(self, mock_cwd, mock_chdir):
args.standard_img = ["test/image-metadata.json", "test/cmsimage-PPR.xml"] args.standard_img = [
"test/input_files/image-metadata.json",
"test/input_files/cmsimage-PPR.xml",
]
with patch("argparse.ArgumentParser.parse_args", MagicMock(return_value=args)) as mock_args: with patch("argparse.ArgumentParser.parse_args", MagicMock(return_value=args)) as mock_args:
with patch("casa_envoy.launchers.ImagingLauncher.launch_casa") as img_launcher: with patch("casa_envoy.launchers.ImagingLauncher.launch_casa") as img_launcher:
......
"""
Tests for casa_envoy.foundation
"""
from pathlib import Path
from unittest.mock import patch
from casa_envoy.schema import AbstractTextFile
from casa_envoy.foundation import RestoreFoundation
parameters = expected_settings = {
"useCasa": False,
"homeForReprocessing": "/home/casa/packages/pipeline/current",
"rootDirectory": "/lustre/aoc/cluster/pipeline/docker/workspaces/spool",
"processingDirectory": "tmpo1ca1pp_",
"parent_path": "/lustre/aoc/cluster/pipeline/docker/workspaces/spool/tmpo1ca1pp_",
"metadata": "test/input_files/restore.json",
"ppr": "test/input_files/PPR.xml",
"product_type": "restore",
}
test_restore_metadata = AbstractTextFile(
filename="test/input_files/restore.json",
content=Path("test/input_files/restore.json").read_text(),
)
foundation = RestoreFoundation(parameters=parameters, metadata=test_restore_metadata)
class TestRestoreFoundation:
@patch("os.chdir")
def test_data_foundation(self, mock_chdir):
with patch("casa_envoy.foundation.RestoreFoundation.extract_cal") as extract:
with patch("casa_envoy.foundation.RestoreFoundation.set_permissions") as permissions:
foundation.data_foundation()
assert mock_chdir.call_count == 1
@patch("pathlib.Path.exists", return_value=True)
@patch("tarfile.open")
def test_extract_cal(self, mock_tar, mock_path):
foundation.extract_cal()
assert mock_tar.call_count == 1
@patch("os.path.join")
@patch("os.chmod")
@patch("os.walk")
def test_set_permissions(self, mock_walk, mock_chmod, mock_join):
foundation.set_permissions()
assert mock_walk.call_count == 1
...@@ -8,8 +8,8 @@ cal_parameters = { ...@@ -8,8 +8,8 @@ cal_parameters = {
"homeForReprocessing": "/home/casa/packages/pipeline/current", "homeForReprocessing": "/home/casa/packages/pipeline/current",
"rootDirectory": "/tmp/workspaces_tmp/", "rootDirectory": "/tmp/workspaces_tmp/",
"processingDirectory": "tmpiox5trbp", "processingDirectory": "tmpiox5trbp",
"metadata": "test/test.json", "metadata": "test/input_files/test.json",
"ppr": "test/PPR.xml", "ppr": "test/input_files/PPR.xml",
"product_type": "standard-cal", "product_type": "standard-cal",
} }
img_parameters = { img_parameters = {
...@@ -17,8 +17,8 @@ img_parameters = { ...@@ -17,8 +17,8 @@ img_parameters = {
"homeForReprocessing": "/home/casa/packages/pipeline/current", "homeForReprocessing": "/home/casa/packages/pipeline/current",
"rootDirectory": "/tmp/workspaces_tmp/", "rootDirectory": "/tmp/workspaces_tmp/",
"processingDirectory": "tmpiox5trbp", "processingDirectory": "tmpiox5trbp",
"metadata": "test/image-metadata.json", "metadata": "test/input_files/image-metadata.json",
"ppr": "test/cmsimage-PPR.xml", "ppr": "test/input_files/cmsimage-PPR.xml",
"product_type": "standard-img", "product_type": "standard-img",
} }
...@@ -28,7 +28,7 @@ class TestCasaLauncher: ...@@ -28,7 +28,7 @@ class TestCasaLauncher:
CasaLauncher(parameters=cal_parameters).setup_environment() CasaLauncher(parameters=cal_parameters).setup_environment()
assert os.environ.get("SCIPIPE_ROOTDIR") == cal_parameters["rootDirectory"] assert os.environ.get("SCIPIPE_ROOTDIR") == cal_parameters["rootDirectory"]
assert os.environ.get("CASA_HOME") == cal_parameters["homeForReprocessing"] assert os.environ.get("CASA_HOME") == cal_parameters["homeForReprocessing"]
assert os.environ.get("PPR_FILENAME") == "test/PPR.xml" assert os.environ.get("PPR_FILENAME") == "test/input_files/PPR.xml"
@patch("subprocess.run") @patch("subprocess.run")
@patch("os.chdir") @patch("os.chdir")
......
...@@ -7,10 +7,10 @@ import logging ...@@ -7,10 +7,10 @@ import logging
import sys import sys
import pendulum import pendulum
import psycopg2 as pg from typing import List
from aat_wrest.observation_wrester import ObservationWrester from aat_wrest.observation_wrester import ObservationWrester
from aat_wrest.utilities import PENDULUM_FORMAT, TIME_ZONE, MDDBConnector from aat_wrest.utilities import PENDULUM_FORMAT, TIME_ZONE, MDDBConnector
from pycapo import CapoConfig
logger = logging.getLogger("aat_wrest") logger = logging.getLogger("aat_wrest")
logger.setLevel(logging.INFO) logger.setLevel(logging.INFO)
...@@ -23,7 +23,11 @@ class WrestWorkflowMetadata: ...@@ -23,7 +23,11 @@ class WrestWorkflowMetadata:
""" """
def __init__( def __init__(
self, connection: MDDBConnector, spl: str = None, fileset_id: str = None, sdm_id: str = None self,
connection: MDDBConnector,
spl: List[str] = None,
fileset_id: str = None,
sdm_id: str = None,
): ):
self.logger = logging.getLogger("aat_wrest") self.logger = logging.getLogger("aat_wrest")
self.conn = connection self.conn = connection
...@@ -130,6 +134,63 @@ class WrestWorkflowMetadata: ...@@ -130,6 +134,63 @@ class WrestWorkflowMetadata:
self.conn.close() self.conn.close()
return make_json return make_json
def wrest_restore_info(self) -> json:
"""
Given an execution block science product locator and calibration science product locator, returns
the required metadata to run the restore CMS workflow
:return:
"""
print(self.spl)
eb_sql = f"""
SELECT ngas_fileset_id as filesetId,
e.project_code as projectCode,
p.title as title,
e.starttime as startTime,
(a.firstname || ' ' || a.lastname) as observer,
telescope as telescope
FROM execution_blocks e
JOIN projects p on e.project_code = p.project_code
JOIN authors a on p.project_code = a.project_code
WHERE science_product_locator = %(spl)s AND a.is_pi = true
"""
cal_sql = f"""
SELECT external_name as calSdmId
from science_products WHERE science_product_locator = %(cal_spl)s
"""
make_json = {}
try:
cursor = self.conn.cursor()
cursor.execute(eb_sql, {"spl": self.spl[0]})
data = cursor.fetchall()
cursor.execute(cal_sql, {"cal_spl": self.spl[1]})
cal_data = cursor.fetchall()
if data and cal_data:
make_json = json.dumps(
{
"sdmId": data[0][0],
"calSdmId": cal_data[0][0],
"projectCode": data[0][1],
"title": data[0][2],
"startTime": data[0][3],
"observer": data[0][4],
"telescope": data[0][5],
"created_at": str(
pendulum.now().in_timezone(TIME_ZONE).format(PENDULUM_FORMAT)
),
}
)
else:
self.logger.error(
f"ERROR: aat-wrest query returned no results!"
f" The database appears to be missing information for either SPL {self.spl[0]} or Cal SPL {self.spl[1]}!"
)
finally:
self.conn.close()
return make_json
def wrest_obs_metadata_from_fileset_id(self, fileset_id: str) -> str: def wrest_obs_metadata_from_fileset_id(self, fileset_id: str) -> str:
""" """
Given a fileset_id, query the Metadata DB and return the corresponding science_product_locator Given a fileset_id, query the Metadata DB and return the corresponding science_product_locator
...@@ -182,6 +243,14 @@ def parser() -> argparse.ArgumentParser: ...@@ -182,6 +243,14 @@ def parser() -> argparse.ArgumentParser:
required=False, required=False,
help="Find workflow metadata for standard CMS imaging with provided SDM id", help="Find workflow metadata for standard CMS imaging with provided SDM id",
) )
arg_parser.add_argument(
"-r",
"--restore",
nargs="+",
default=[],
required=False,
help="Find workflow metadata for Restores with provided EB product locator and Cal product locator",
)
arg_parser.add_argument( arg_parser.add_argument(
"-obs", "-obs",
"--observation", "--observation",
...@@ -198,6 +267,9 @@ def determine_wrester(connection: MDDBConnector, args: argparse.Namespace): ...@@ -198,6 +267,9 @@ def determine_wrester(connection: MDDBConnector, args: argparse.Namespace):
data = WrestWorkflowMetadata(connection, spl=args.stdcals[0]).wrest_standard_cal_info() data = WrestWorkflowMetadata(connection, spl=args.stdcals[0]).wrest_standard_cal_info()
if args.stdimg: if args.stdimg:
data = WrestWorkflowMetadata(connection, sdm_id=args.stdimg[0]).wrest_standard_image_info() data = WrestWorkflowMetadata(connection, sdm_id=args.stdimg[0]).wrest_standard_image_info()
if args.restore:
print(args)
data = WrestWorkflowMetadata(connection, spl=args.restore).wrest_restore_info()
if args.observation: if args.observation:
data = ObservationWrester(connection, spl=args.observation[0]).wrest_observation_info() data = ObservationWrester(connection, spl=args.observation[0]).wrest_observation_info()
......
"""add restore capabilities
Revision ID: 7200d0d19938
Revises: a7c2b4682aae
Create Date: 2021-07-13 09:51:45.729067
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = "7200d0d19938"
down_revision = "a7c2b4682aae"
branch_labels = None
depends_on = None
condor_content = """executable = restore_cms.sh
arguments = {{product_locator}} {{cal_product_locator}} {{request_id}} metadata.json PPR.xml
output = restore.out
error = restore.err
log = condor.log
SBIN_PATH = /lustre/aoc/cluster/pipeline/$ENV(CAPO_PROFILE)/workspaces/sbin
should_transfer_files = yes
transfer_input_files = $(SBIN_PATH)/.matplotlib, $(SBIN_PATH)/pycapo, $(SBIN_PATH)/framework.sh, $(SBIN_PATH)/productfetcher, $(SBIN_PATH)/casa_envoy, $(SBIN_PATH)/vela, $(SBIN_PATH)/deliver, ./PPR.xml, ./metadata.json
transfer_output_files = working, rawdata, products
request_memory = {{ramInGb}}
getenv = True
environment = "CAPO_PATH=/home/casa/capo"
queue
"""
script_content = """#!/bin/sh
export HOME=$TMPDIR
set -o errexit
./framework.sh -d .
chmod 770 .
cd rawdata/
../productfetcher --product-locator $1 $2
cd ../
./casa_envoy --restore -c $4 $5
cp ./working/{{sdmId}}.ms ./products/
./deliver -p ./products --prefix $3
"""
metadata_content = """{
"fileSetIds": ["{{sdmId}}", "{{calSdmId}}"]
"workflowName": "std_calibration",
"systemId": "{{request_id}}",
"creationTime": "{{created_at}}",
"productLocator": "{{product_locator}}",
"calProductLocator": "{{cal_locator}}",
"projectMetadata": {
"projectCode": "{{projectCode}}",
"title": "{{title}}",
"startTime": "{{startTime}}",
"observer": "{{observer}}"
},
"destinationDirectory": "{{root_directory}}/{{relative_path}}"
}
"""
ppr_content = """<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<ns2:SciPipeRequest xmlns:ns2="Common/pipelinescience/SciPipeRequest">
<ProjectSummary>
<ProposalCode>VLA/null</ProposalCode>
<Observatory>NRAO</Observatory>
<Telescope>VLA</Telescope>
<ProcessingSite>Socorro</ProcessingSite>
<Operator>vlapipe</Operator>
<Mode>SCIENCE</Mode>
<Version>NGRH-ALMA-10_8</Version>
<CreationTime>{{created_at}}</CreationTime>
</ProjectSummary>
<ProjectStructure>TBD</ProjectStructure>
<ProcessingRequests>
<RootDirectory>{{root_directory}}</RootDirectory>
<ProcessingRequest>
<ProcessingIntents>
<Intents>
<Keyword>VLA_INTERFEROMETRY_STANDARD_OBSERVING_MODE</Keyword>
<Value>Undefined</Value>
</Intents>
</ProcessingIntents>
<ProcessingProcedure>
<ProcedureTitle>Workspaces Restore</ProcedureTitle>
<ProcessingCommand>
<Command>hifv_restoredata</Command>
<ParameterSet/>
</ProcessingCommand>
<ProcessingCommand>
<Command>hifv_statwt</Command>
<ParameterSet/>
</ProcessingCommand>
</ProcessingProcedure>
<DataSet>
<RelativePath>{{relative_path}}</RelativePath>
<SdmIdentifier>{{sdmId}}</SdmIdentifier>
<DataType>asdm</DataType>
</DataSet>
</ProcessingRequest>0
</ProcessingRequests>
<ResultsProcessing>
<ArchiveResults>false</ArchiveResults>
<CleanUpDisk>false</CleanUpDisk>
<UpdateProjectLifeCycle>false</UpdateProjectLifeCycle>
<NotifyOperatorWhenDone>false</NotifyOperatorWhenDone>
<SDMall>false</SDMall>
<SDMonly>false</SDMonly>
<PipelineOperatorAddress>Unknown</PipelineOperatorAddress>
</ResultsProcessing>
</ns2:SciPipeRequest>
"""
def upgrade():
restore_steps = """prepare-and-run-workflow restore_cms
await-workflow
await-qa
"""
op.execute(
f"""
INSERT INTO capabilities (capability_name, capability_steps, max_jobs)
VALUES ('restore_cms', '{restore_steps}', 20)
"""
)
op.execute(
"""
INSERT INTO workflows (workflow_name) VALUES ('restore_cms')
"""
)
op.execute(
f"""
INSERT INTO workflow_templates (filename, content, workflow_name)
VALUES ('restore_cms.condor', E'{condor_content}', 'restore_cms')
"""
)
op.execute(
f"""
INSERT INTO workflow_templates (filename, content, workflow_name)
VALUES ('restore_cms.sh', E'{script_content}', 'restore_cms')
"""
)
op.execute(
f"""
INSERT INTO workflow_templates (filename, content, workflow_name)
VALUES ('metadata.json', E'{metadata_content}', 'restore_cms')
"""
)
op.execute(
f"""
INSERT INTO workflow_templates (filename, content, workflow_name)
VALUES ('PPR.xml', E'{ppr_content}', 'restore_cms')
"""
)
def downgrade():
op.execute(
"""
DELETE FROM capabilities WHERE capability_name = 'restore_cms'
"""
)
op.execute(
"""
DELETE FROM workflows WHERE workflow_name = 'restore_cms'
"""
)
op.execute(
"""
DELETE FROM workflow_templates WHERE workflow_name = 'restore_cms'
"""
)
"""empty message
Revision ID: a70e3e5d5bf3
Revises: 0bca0d8b3564, 7200d0d19938
Create Date: 2021-07-15 10:22:22.651390
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = 'a70e3e5d5bf3'
down_revision = ('0bca0d8b3564', '7200d0d19938')
branch_labels = None
depends_on = None
def upgrade():
pass
def downgrade():
pass
...@@ -235,6 +235,9 @@ class WorkflowService(WorkflowServiceIF): ...@@ -235,6 +235,9 @@ class WorkflowService(WorkflowServiceIF):
if "calibration" in name: if "calibration" in name:
wrest_type = "-sc" wrest_type = "-sc"
argument = wf_request.argument["product_locator"] argument = wf_request.argument["product_locator"]
elif "restore" in name:
wrest_type = "-r"
argument = [wf_request.argument["product_locator"], wf_request.argument["cal_locator"]]
elif "imaging" in name: elif "imaging" in name:
wrest_type = "-si" wrest_type = "-si"
argument = wf_request.argument["sdmId"] argument = wf_request.argument["sdmId"]
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment