From 06f2792a5d9f5c59040e2ad0b3be06efed451ce2 Mon Sep 17 00:00:00 2001 From: chausman <chausman@nrao.edu> Date: Tue, 13 Jul 2021 16:58:12 -0600 Subject: [PATCH] WS-547 & WS-548: restore capability and workflow --- .../pexable/casa_envoy/casa_envoy/auditor.py | 22 +++- .../casa_envoy/casa_envoy/foundation.py | 58 ++++++++++ .../casa_envoy/casa_envoy/interfaces.py | 15 ++- .../casa_envoy/casa_envoy/launchers.py | 37 ++++++- .../pexable/casa_envoy/casa_envoy/palaver.py | 27 ++++- .../pexable/casa_envoy/test/restore.json | 14 +++ .../pexable/casa_envoy/test/test_auditor.py | 2 +- .../casa_envoy/test/test_casa_envoy.py | 2 + .../casa_envoy/test/test_foundation.py | 49 +++++++++ .../7200d0d19938_add_restore_capability.py | 104 ++++++++++++++++++ 10 files changed, 318 insertions(+), 12 deletions(-) create mode 100644 apps/cli/executables/pexable/casa_envoy/casa_envoy/foundation.py create mode 100644 apps/cli/executables/pexable/casa_envoy/test/restore.json create mode 100644 apps/cli/executables/pexable/casa_envoy/test/test_foundation.py create mode 100644 schema/versions/7200d0d19938_add_restore_capability.py diff --git a/apps/cli/executables/pexable/casa_envoy/casa_envoy/auditor.py b/apps/cli/executables/pexable/casa_envoy/casa_envoy/auditor.py index f6c85c0d2..77591822e 100644 --- a/apps/cli/executables/pexable/casa_envoy/casa_envoy/auditor.py +++ b/apps/cli/executables/pexable/casa_envoy/casa_envoy/auditor.py @@ -34,6 +34,10 @@ def get_fields_for(product_type: str, filename: str) -> list: "sdmId", ] + restore_metadata_list = [ + "calProductLocator", + ] + ppr_list = ["RootDirectory", "RelativePath", "SdmIdentifier"] if ".xml" in filename: @@ -43,6 +47,8 @@ def get_fields_for(product_type: str, filename: str) -> list: return cal_metadata_list elif ".json" in filename and "img" in product_type: return img_metadata_list + elif ".json" in filename and product_type == "restore": + return cal_metadata_list + restore_metadata_list def get_xml_content(file: AbstractTextFile): @@ -126,10 +132,12 @@ class AuditFiles(AuditorIF): class AuditDirectories(AuditorIF): def __init__(self, ppr: AbstractTextFile, settings: Dict[str, str]): self.logger = logging.getLogger("casa_envoy") + self.parameters = settings self.rootDirectory = settings["rootDirectory"] self.relative_path = settings["processingDirectory"] self.sdmId = get_value_for(ppr, "SdmIdentifier") + @property def audit(self) -> bool: current = os.getcwd() needed = self.rootDirectory + "/" + self.relative_path @@ -146,11 +154,21 @@ class AuditDirectories(AuditorIF): data = os.listdir(Path(current + "/rawdata/")) if len(data) > 0: self.logger.info("Data is available. Proceeding...") + + if self.parameters["product_type"] is "restore": + self.logger.info("Checking products/ for calibration tables...") + cal_data = os.listdir(Path(current + "/products/")) + if len(cal_data) > 0: + self.logger.info("Calibration data is available. Proceeding...") + else: + self.logger.error("FAILURE: calibration data not found in products/") + return False + return True else: - self.logger.info("FAILURE: data not found in rawdata/") + self.logger.error("FAILURE: data not found in rawdata/") return False else: - self.logger.info( + self.logger.error( "DIRECTORY ERROR: A directory is missing from the processing root directory." ) diff --git a/apps/cli/executables/pexable/casa_envoy/casa_envoy/foundation.py b/apps/cli/executables/pexable/casa_envoy/casa_envoy/foundation.py new file mode 100644 index 000000000..52c45b5a5 --- /dev/null +++ b/apps/cli/executables/pexable/casa_envoy/casa_envoy/foundation.py @@ -0,0 +1,58 @@ +""" +Classes and methods for laying the data location foundation for various types of CASA processing +""" +import logging +import os +import tarfile +from pathlib import Path + +import json + +from casa_envoy.schema import AbstractTextFile + +from casa_envoy.interfaces import FoundationIF + + +class RestoreFoundation(FoundationIF): + def __init__(self, parameters: dict, metadata: AbstractTextFile): + self.logger = logging.getLogger("casa_envoy") + self.parameters = parameters + self.parent_path = parameters["parent_path"] + self.metadata = metadata + + def data_foundation(self): + """ + CMS Restore requires two inputs: An EB and a Calibration for that EB + After download, all data is in rawdata and the calibrations tables are contained + in a tar file. We need to extract all the calibration tables to the products directory + for CASA processing. + :return: + """ + + self.logger.info("LAYING DATA FOUNDATION...") + # ensure we're starting from the parent directory + os.chdir(self.parent_path) + self.extract_cal() + self.set_permissions() + self.logger.info("DATA FOUNDATION COMPLETE!") + + def extract_cal(self): + self.logger.info("Extracting calibration tar file to products directory...") + cal_name = json.loads(self.metadata.content)["fileSetIds"][1] + cal_path = "./rawdata" + cal_name + if Path(cal_path).exists(): + calibration = tarfile.open(cal_path) + # extract to products + calibration.extractall(path="./products") + calibration.close() + else: + self.logger.error(f"ERROR: calibration tar file {cal_name} not found in rawdata!") + + def set_permissions(self): + self.logger.info("Ensuring correct file permissions....") + path = Path("./products") + for root, dirs, files in os.walk(path): + for d in dirs: + os.chmod(os.path.join(root, d), 0o755) + for f in files: + os.chmod(os.path.join(root, f), 0o755) diff --git a/apps/cli/executables/pexable/casa_envoy/casa_envoy/interfaces.py b/apps/cli/executables/pexable/casa_envoy/casa_envoy/interfaces.py index 6721bdb07..a3967389a 100644 --- a/apps/cli/executables/pexable/casa_envoy/casa_envoy/interfaces.py +++ b/apps/cli/executables/pexable/casa_envoy/casa_envoy/interfaces.py @@ -26,5 +26,18 @@ class AuditorIF(ABC): Generic functionality implementation for auditor classes """ + @abc.abstractmethod def audit(self): - raise NotImplementedError + pass + + +class FoundationIF(ABC): + """ + Generic Foundation methods + Should be implemented for any type of CASA processing that requires initial data placement + in locations other than rawdata after download + """ + + @abc.abstractmethod + def data_foundation(self): + pass diff --git a/apps/cli/executables/pexable/casa_envoy/casa_envoy/launchers.py b/apps/cli/executables/pexable/casa_envoy/casa_envoy/launchers.py index feb570ecb..0cc7db7b2 100644 --- a/apps/cli/executables/pexable/casa_envoy/casa_envoy/launchers.py +++ b/apps/cli/executables/pexable/casa_envoy/casa_envoy/launchers.py @@ -9,6 +9,7 @@ from typing import Dict import json from casa_envoy.auditor import AuditFiles, AuditDirectories +from casa_envoy.foundation import RestoreFoundation from casa_envoy.interfaces import LauncherIF from casa_envoy.schema import AbstractTextFile @@ -102,23 +103,49 @@ class CalibrationLauncher(LauncherIF): self.metadata = get_abs_file(parameters.get("metadata")) def launch_casa(self): - if self.check_calibratable(): + if self.parameters["product_type"] == "restore": + check_input = self.check_restorable() + else: + check_input = self.check_calibratable() + + if check_input: + self.prepare_for_casa() self.run_audit(self.parameters) CasaLauncher(self.parameters).run() else: - self.logger.error("ERROR: Provided SPL is not type execution block!") + self.logger.error("ERROR: Provided SPL(s) are not correct type(s) for processing!") sys.exit(1) + def prepare_for_casa(self): + # Ensure all data is in the required locations for CASA processing (This is not always rawdata!) + if self.parameters["product_type"] == "restore": + RestoreFoundation(self.parameters, self.metadata).data_foundation() + + return + def check_calibratable(self) -> bool: spl = self.metadata.content["productLocator"] if "execblock" in spl: return True else: - self.logger.info("SPL ERROR: This product locator is not calibratable!") + self.logger.error( + "SPL ERROR: This product locator is not calibratable! Please check your inputs" + ) return False + def check_restorable(self) -> bool: + spl = self.metadata.content["productLocator"] + cal_spl = self.metadata.content["calProductLocator"] + + if "execblock" in spl and "calibration" in cal_spl: + return True + else: + self.logger.error( + "SPL ERROR: This set of product locators are not restorable! Please check your inputs" + ) + def run_audit(self, parameters: Dict[str, str]): - dir_audit = AuditDirectories(self.ppr, parameters).audit() + dir_audit = AuditDirectories(self.ppr, parameters).audit if dir_audit: self.logger.info("Directory audit successful!") else: @@ -159,7 +186,7 @@ class ImagingLauncher(LauncherIF): return False def run_audit(self, parameters: Dict[str, str]): - dir_audit = AuditDirectories(self.ppr, parameters).audit() + dir_audit = AuditDirectories(self.ppr, parameters).audit if dir_audit: self.logger.info("Directory audit successful!") else: diff --git a/apps/cli/executables/pexable/casa_envoy/casa_envoy/palaver.py b/apps/cli/executables/pexable/casa_envoy/casa_envoy/palaver.py index 8be5536d8..4e153d605 100644 --- a/apps/cli/executables/pexable/casa_envoy/casa_envoy/palaver.py +++ b/apps/cli/executables/pexable/casa_envoy/casa_envoy/palaver.py @@ -50,6 +50,7 @@ def arg_parser() -> argparse.ArgumentParser: formatter_class=argparse.RawTextHelpFormatter, ) parser.add_argument( + "-c", "--standard-cal", nargs=2, action="store", @@ -57,23 +58,40 @@ def arg_parser() -> argparse.ArgumentParser: help="run the standard calibration CASA pipeline", ) parser.add_argument( + "-i", "--standard-img", nargs=2, action="store", required=False, - help="run the standard calibration CASA pipeline", + help="run the standard imaging CASA pipeline", + ) + parser.add_argument( + "--restore", + required=False, + action="store_true", + help="run the restore measurement set CASA pipeline", + ) + parser.add_argument( + "--integrated", + required=False, + action="store_true", + help="run an integrated calibration-imaging pipeline", ) return parser def main(): args = arg_parser().parse_args() + print(args) path = os.getcwd() if args.standard_cal is not None: parameters = _get_settings(pathlib.Path(path), args.standard_cal) - parameters["product_type"] = "standard-cal" + if args.restore: + parameters["product_type"] = "restore" + else: + parameters["product_type"] = "standard-cal" CalibrationLauncher(parameters).launch_casa() # make sure we return to the parent directory after processing @@ -81,7 +99,10 @@ def main(): elif args.standard_img is not None: parameters = _get_settings(pathlib.Path(path), args.standard_img) - parameters["product_type"] = "standard-img" + if args.integrated: + parameters["product_type"] = "integrated" + else: + parameters["product_type"] = "standard-img" ImagingLauncher(parameters).launch_casa() # return to parent directory after processing diff --git a/apps/cli/executables/pexable/casa_envoy/test/restore.json b/apps/cli/executables/pexable/casa_envoy/test/restore.json new file mode 100644 index 000000000..d77cc9cf5 --- /dev/null +++ b/apps/cli/executables/pexable/casa_envoy/test/restore.json @@ -0,0 +1,14 @@ +{ + "fileSetIds": ["brain_000.58099.67095825232", "calibration.tar"], + "workflowName": "restore_cms", + "systemId": "2", + "productLocator": "uid://evla/execblock/ec082e65-452d-4fec-ad88-f5b4af1f9e36", + "calProductLocator": "uid://evla/calibration/ec082e65-452d-4fec-ad88-f5b4af1f9e36", + "projectMetadata": { + "projectCode": "Operations", + "title": "", + "startTime": "58099.6710792824", + "observer": "VLA Operations" + }, + "destinationDirectory": "/lustre/aoc/cluster/pipeline/docker/workspaces/spool/tmpabcd1234" +} diff --git a/apps/cli/executables/pexable/casa_envoy/test/test_auditor.py b/apps/cli/executables/pexable/casa_envoy/test/test_auditor.py index 87b69ed65..cc896a22b 100644 --- a/apps/cli/executables/pexable/casa_envoy/test/test_auditor.py +++ b/apps/cli/executables/pexable/casa_envoy/test/test_auditor.py @@ -147,5 +147,5 @@ class TestAuditFiles: class TestAuditDirectories: def test_audit(self): - result = AuditDirectories(test_ppr, cal_settings).audit() + result = AuditDirectories(test_ppr, cal_settings).audit assert result is False diff --git a/apps/cli/executables/pexable/casa_envoy/test/test_casa_envoy.py b/apps/cli/executables/pexable/casa_envoy/test/test_casa_envoy.py index c0ff840ec..e28c7391e 100644 --- a/apps/cli/executables/pexable/casa_envoy/test/test_casa_envoy.py +++ b/apps/cli/executables/pexable/casa_envoy/test/test_casa_envoy.py @@ -22,6 +22,7 @@ args = argparse.Namespace() class TestPalaver: def test_get_settings(self): args.standard_cal = ["test/test.json", "test/PPR.xml"] + args.restore = False with patch( "pathlib.Path.cwd", @@ -41,6 +42,7 @@ class TestPalaver: @patch("os.getcwd") def test_main_cal(self, mock_cwd, mock_chdir): args.standard_cal = ["test/test.json", "test/PPR.xml"] + args.integrated = False with patch("argparse.ArgumentParser.parse_args", MagicMock(return_value=args)) as mock_args: with patch("casa_envoy.launchers.CalibrationLauncher.launch_casa") as cal_launcher: diff --git a/apps/cli/executables/pexable/casa_envoy/test/test_foundation.py b/apps/cli/executables/pexable/casa_envoy/test/test_foundation.py new file mode 100644 index 000000000..2482d2276 --- /dev/null +++ b/apps/cli/executables/pexable/casa_envoy/test/test_foundation.py @@ -0,0 +1,49 @@ +""" +Tests for casa_envoy.foundation +""" +from pathlib import Path +from unittest.mock import patch + +from casa_envoy.schema import AbstractTextFile + +from casa_envoy.foundation import RestoreFoundation + + +parameters = expected_settings = { + "useCasa": False, + "homeForReprocessing": "/home/casa/packages/pipeline/current", + "rootDirectory": "/lustre/aoc/cluster/pipeline/docker/workspaces/spool", + "processingDirectory": "tmpo1ca1pp_", + "parent_path": "/lustre/aoc/cluster/pipeline/docker/workspaces/spool/tmpo1ca1pp_", + "metadata": "test/restore.json", + "ppr": "test/PPR.xml", + "product_type": "restore", +} + +test_restore_metadata = AbstractTextFile( + filename="test/restore.json", content=Path("test/restore.json").read_text() +) + +foundation = RestoreFoundation(parameters=parameters, metadata=test_restore_metadata) + + +class TestRestoreFoundation: + @patch("os.chdir") + def test_data_foundation(self, mock_chdir): + with patch("casa_envoy.foundation.RestoreFoundation.extract_cal") as extract: + with patch("casa_envoy.foundation.RestoreFoundation.set_permissions") as permissions: + foundation.data_foundation() + assert mock_chdir.call_count == 1 + + @patch("pathlib.Path.exists", return_value=True) + @patch("tarfile.open") + def test_extract_cal(self, mock_tar, mock_path): + foundation.extract_cal() + assert mock_tar.call_count == 1 + + @patch("os.path.join") + @patch("os.chmod") + @patch("os.walk") + def test_set_permissions(self, mock_walk, mock_chmod, mock_join): + foundation.set_permissions() + assert mock_walk.call_count == 1 diff --git a/schema/versions/7200d0d19938_add_restore_capability.py b/schema/versions/7200d0d19938_add_restore_capability.py new file mode 100644 index 000000000..2acff2020 --- /dev/null +++ b/schema/versions/7200d0d19938_add_restore_capability.py @@ -0,0 +1,104 @@ +"""add restore capabilities + +Revision ID: 7200d0d19938 +Revises: a7c2b4682aae +Create Date: 2021-07-13 09:51:45.729067 + +""" +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision = "7200d0d19938" +down_revision = "a7c2b4682aae" +branch_labels = None +depends_on = None + +restore_condor_content = """executable = restore_cms.sh +arguments = {{product_locator}} {{cal_product_locator}} {{request_id}} metadata.json PPR.xml + +output = restore.out +error = restore.err +log = condor.log + +SBIN_PATH = /lustre/aoc/cluster/pipeline/$ENV(CAPO_PROFILE)/workspaces/sbin +should_transfer_files = yes +transfer_input_files = $(SBIN_PATH)/.matplotlib, $(SBIN_PATH)/pycapo, $(SBIN_PATH)/framework.sh, $(SBIN_PATH)/productfetcher, $(SBIN_PATH)/casa_envoy, $(SBIN_PATH)/vela, $(SBIN_PATH)/deliver, ./PPR.xml, ./metadata.json +transfer_output_files = working, rawdata, products + +request_memory = {{ramInGb}} +getenv = True +environment = "CAPO_PATH=/home/casa/capo" + +queue + +""" + +restore_script_content = """#!/bin/sh + +export HOME=$TMPDIR + +set -o errexit +./framework.sh -d . +chmod 770 . +cd rawdata/ +../productfetcher --product-locator $1 $2 +cd ../ +./casa_envoy --restore -c $4 $5 +./deliver -p . --prefix $3 + +""" + + +def upgrade(): + restore_steps = """prepare-and-run-workflow restore_cms +await-workflow +await-qa + """ + + op.execute( + f""" + INSERT INTO capabilities (capability_name, capability_steps, max_jobs) + VALUES ('restore_cms', '{restore_steps}', 20) + """ + ) + + op.execute( + """ + INSERT INTO workflows (workflow_name) VALUES ('restore_cms') + """ + ) + + op.execute( + f""" + INSERT INTO workflow_templates (filename, content, workflow_name) + VALUES ('restore_cms.condor', E'{restore_condor_content}', 'restore_cms') + """ + ) + op.execute( + f""" + INSERT INTO workflow_templates (filename, content, workflow_name) + VALUES ('restore_cms.sh', E'{restore_script_content}', 'restore_cms') + """ + ) + + +def downgrade(): + op.execute( + """ + DELETE FROM capabilities WHERE capability_name = 'restore_cms' + """ + ) + + op.execute( + """ + DELETE FROM workflows WHERE workflow_name = 'restore_cms' + """ + ) + + op.execute( + """ + DELETE FROM workflow_templates WHERE workflow_name = 'restore_cms' + """ + ) -- GitLab