Skip to content
Snippets Groups Projects
Commit 06f2792a authored by Charlotte Hausman's avatar Charlotte Hausman
Browse files

WS-547 & WS-548: restore capability and workflow

parent 6165066b
No related branches found
No related tags found
1 merge request!340restore capability and workflow
Showing
with 318 additions and 12 deletions
......@@ -34,6 +34,10 @@ def get_fields_for(product_type: str, filename: str) -> list:
"sdmId",
]
restore_metadata_list = [
"calProductLocator",
]
ppr_list = ["RootDirectory", "RelativePath", "SdmIdentifier"]
if ".xml" in filename:
......@@ -43,6 +47,8 @@ def get_fields_for(product_type: str, filename: str) -> list:
return cal_metadata_list
elif ".json" in filename and "img" in product_type:
return img_metadata_list
elif ".json" in filename and product_type == "restore":
return cal_metadata_list + restore_metadata_list
def get_xml_content(file: AbstractTextFile):
......@@ -126,10 +132,12 @@ class AuditFiles(AuditorIF):
class AuditDirectories(AuditorIF):
def __init__(self, ppr: AbstractTextFile, settings: Dict[str, str]):
self.logger = logging.getLogger("casa_envoy")
self.parameters = settings
self.rootDirectory = settings["rootDirectory"]
self.relative_path = settings["processingDirectory"]
self.sdmId = get_value_for(ppr, "SdmIdentifier")
@property
def audit(self) -> bool:
current = os.getcwd()
needed = self.rootDirectory + "/" + self.relative_path
......@@ -146,11 +154,21 @@ class AuditDirectories(AuditorIF):
data = os.listdir(Path(current + "/rawdata/"))
if len(data) > 0:
self.logger.info("Data is available. Proceeding...")
if self.parameters["product_type"] is "restore":
self.logger.info("Checking products/ for calibration tables...")
cal_data = os.listdir(Path(current + "/products/"))
if len(cal_data) > 0:
self.logger.info("Calibration data is available. Proceeding...")
else:
self.logger.error("FAILURE: calibration data not found in products/")
return False
return True
else:
self.logger.info("FAILURE: data not found in rawdata/")
self.logger.error("FAILURE: data not found in rawdata/")
return False
else:
self.logger.info(
self.logger.error(
"DIRECTORY ERROR: A directory is missing from the processing root directory."
)
"""
Classes and methods for laying the data location foundation for various types of CASA processing
"""
import logging
import os
import tarfile
from pathlib import Path
import json
from casa_envoy.schema import AbstractTextFile
from casa_envoy.interfaces import FoundationIF
class RestoreFoundation(FoundationIF):
def __init__(self, parameters: dict, metadata: AbstractTextFile):
self.logger = logging.getLogger("casa_envoy")
self.parameters = parameters
self.parent_path = parameters["parent_path"]
self.metadata = metadata
def data_foundation(self):
"""
CMS Restore requires two inputs: An EB and a Calibration for that EB
After download, all data is in rawdata and the calibrations tables are contained
in a tar file. We need to extract all the calibration tables to the products directory
for CASA processing.
:return:
"""
self.logger.info("LAYING DATA FOUNDATION...")
# ensure we're starting from the parent directory
os.chdir(self.parent_path)
self.extract_cal()
self.set_permissions()
self.logger.info("DATA FOUNDATION COMPLETE!")
def extract_cal(self):
self.logger.info("Extracting calibration tar file to products directory...")
cal_name = json.loads(self.metadata.content)["fileSetIds"][1]
cal_path = "./rawdata" + cal_name
if Path(cal_path).exists():
calibration = tarfile.open(cal_path)
# extract to products
calibration.extractall(path="./products")
calibration.close()
else:
self.logger.error(f"ERROR: calibration tar file {cal_name} not found in rawdata!")
def set_permissions(self):
self.logger.info("Ensuring correct file permissions....")
path = Path("./products")
for root, dirs, files in os.walk(path):
for d in dirs:
os.chmod(os.path.join(root, d), 0o755)
for f in files:
os.chmod(os.path.join(root, f), 0o755)
......@@ -26,5 +26,18 @@ class AuditorIF(ABC):
Generic functionality implementation for auditor classes
"""
@abc.abstractmethod
def audit(self):
raise NotImplementedError
pass
class FoundationIF(ABC):
"""
Generic Foundation methods
Should be implemented for any type of CASA processing that requires initial data placement
in locations other than rawdata after download
"""
@abc.abstractmethod
def data_foundation(self):
pass
......@@ -9,6 +9,7 @@ from typing import Dict
import json
from casa_envoy.auditor import AuditFiles, AuditDirectories
from casa_envoy.foundation import RestoreFoundation
from casa_envoy.interfaces import LauncherIF
from casa_envoy.schema import AbstractTextFile
......@@ -102,23 +103,49 @@ class CalibrationLauncher(LauncherIF):
self.metadata = get_abs_file(parameters.get("metadata"))
def launch_casa(self):
if self.check_calibratable():
if self.parameters["product_type"] == "restore":
check_input = self.check_restorable()
else:
check_input = self.check_calibratable()
if check_input:
self.prepare_for_casa()
self.run_audit(self.parameters)
CasaLauncher(self.parameters).run()
else:
self.logger.error("ERROR: Provided SPL is not type execution block!")
self.logger.error("ERROR: Provided SPL(s) are not correct type(s) for processing!")
sys.exit(1)
def prepare_for_casa(self):
# Ensure all data is in the required locations for CASA processing (This is not always rawdata!)
if self.parameters["product_type"] == "restore":
RestoreFoundation(self.parameters, self.metadata).data_foundation()
return
def check_calibratable(self) -> bool:
spl = self.metadata.content["productLocator"]
if "execblock" in spl:
return True
else:
self.logger.info("SPL ERROR: This product locator is not calibratable!")
self.logger.error(
"SPL ERROR: This product locator is not calibratable! Please check your inputs"
)
return False
def check_restorable(self) -> bool:
spl = self.metadata.content["productLocator"]
cal_spl = self.metadata.content["calProductLocator"]
if "execblock" in spl and "calibration" in cal_spl:
return True
else:
self.logger.error(
"SPL ERROR: This set of product locators are not restorable! Please check your inputs"
)
def run_audit(self, parameters: Dict[str, str]):
dir_audit = AuditDirectories(self.ppr, parameters).audit()
dir_audit = AuditDirectories(self.ppr, parameters).audit
if dir_audit:
self.logger.info("Directory audit successful!")
else:
......@@ -159,7 +186,7 @@ class ImagingLauncher(LauncherIF):
return False
def run_audit(self, parameters: Dict[str, str]):
dir_audit = AuditDirectories(self.ppr, parameters).audit()
dir_audit = AuditDirectories(self.ppr, parameters).audit
if dir_audit:
self.logger.info("Directory audit successful!")
else:
......
......@@ -50,6 +50,7 @@ def arg_parser() -> argparse.ArgumentParser:
formatter_class=argparse.RawTextHelpFormatter,
)
parser.add_argument(
"-c",
"--standard-cal",
nargs=2,
action="store",
......@@ -57,23 +58,40 @@ def arg_parser() -> argparse.ArgumentParser:
help="run the standard calibration CASA pipeline",
)
parser.add_argument(
"-i",
"--standard-img",
nargs=2,
action="store",
required=False,
help="run the standard calibration CASA pipeline",
help="run the standard imaging CASA pipeline",
)
parser.add_argument(
"--restore",
required=False,
action="store_true",
help="run the restore measurement set CASA pipeline",
)
parser.add_argument(
"--integrated",
required=False,
action="store_true",
help="run an integrated calibration-imaging pipeline",
)
return parser
def main():
args = arg_parser().parse_args()
print(args)
path = os.getcwd()
if args.standard_cal is not None:
parameters = _get_settings(pathlib.Path(path), args.standard_cal)
parameters["product_type"] = "standard-cal"
if args.restore:
parameters["product_type"] = "restore"
else:
parameters["product_type"] = "standard-cal"
CalibrationLauncher(parameters).launch_casa()
# make sure we return to the parent directory after processing
......@@ -81,7 +99,10 @@ def main():
elif args.standard_img is not None:
parameters = _get_settings(pathlib.Path(path), args.standard_img)
parameters["product_type"] = "standard-img"
if args.integrated:
parameters["product_type"] = "integrated"
else:
parameters["product_type"] = "standard-img"
ImagingLauncher(parameters).launch_casa()
# return to parent directory after processing
......
{
"fileSetIds": ["brain_000.58099.67095825232", "calibration.tar"],
"workflowName": "restore_cms",
"systemId": "2",
"productLocator": "uid://evla/execblock/ec082e65-452d-4fec-ad88-f5b4af1f9e36",
"calProductLocator": "uid://evla/calibration/ec082e65-452d-4fec-ad88-f5b4af1f9e36",
"projectMetadata": {
"projectCode": "Operations",
"title": "",
"startTime": "58099.6710792824",
"observer": "VLA Operations"
},
"destinationDirectory": "/lustre/aoc/cluster/pipeline/docker/workspaces/spool/tmpabcd1234"
}
......@@ -147,5 +147,5 @@ class TestAuditFiles:
class TestAuditDirectories:
def test_audit(self):
result = AuditDirectories(test_ppr, cal_settings).audit()
result = AuditDirectories(test_ppr, cal_settings).audit
assert result is False
......@@ -22,6 +22,7 @@ args = argparse.Namespace()
class TestPalaver:
def test_get_settings(self):
args.standard_cal = ["test/test.json", "test/PPR.xml"]
args.restore = False
with patch(
"pathlib.Path.cwd",
......@@ -41,6 +42,7 @@ class TestPalaver:
@patch("os.getcwd")
def test_main_cal(self, mock_cwd, mock_chdir):
args.standard_cal = ["test/test.json", "test/PPR.xml"]
args.integrated = False
with patch("argparse.ArgumentParser.parse_args", MagicMock(return_value=args)) as mock_args:
with patch("casa_envoy.launchers.CalibrationLauncher.launch_casa") as cal_launcher:
......
"""
Tests for casa_envoy.foundation
"""
from pathlib import Path
from unittest.mock import patch
from casa_envoy.schema import AbstractTextFile
from casa_envoy.foundation import RestoreFoundation
parameters = expected_settings = {
"useCasa": False,
"homeForReprocessing": "/home/casa/packages/pipeline/current",
"rootDirectory": "/lustre/aoc/cluster/pipeline/docker/workspaces/spool",
"processingDirectory": "tmpo1ca1pp_",
"parent_path": "/lustre/aoc/cluster/pipeline/docker/workspaces/spool/tmpo1ca1pp_",
"metadata": "test/restore.json",
"ppr": "test/PPR.xml",
"product_type": "restore",
}
test_restore_metadata = AbstractTextFile(
filename="test/restore.json", content=Path("test/restore.json").read_text()
)
foundation = RestoreFoundation(parameters=parameters, metadata=test_restore_metadata)
class TestRestoreFoundation:
@patch("os.chdir")
def test_data_foundation(self, mock_chdir):
with patch("casa_envoy.foundation.RestoreFoundation.extract_cal") as extract:
with patch("casa_envoy.foundation.RestoreFoundation.set_permissions") as permissions:
foundation.data_foundation()
assert mock_chdir.call_count == 1
@patch("pathlib.Path.exists", return_value=True)
@patch("tarfile.open")
def test_extract_cal(self, mock_tar, mock_path):
foundation.extract_cal()
assert mock_tar.call_count == 1
@patch("os.path.join")
@patch("os.chmod")
@patch("os.walk")
def test_set_permissions(self, mock_walk, mock_chmod, mock_join):
foundation.set_permissions()
assert mock_walk.call_count == 1
"""add restore capabilities
Revision ID: 7200d0d19938
Revises: a7c2b4682aae
Create Date: 2021-07-13 09:51:45.729067
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = "7200d0d19938"
down_revision = "a7c2b4682aae"
branch_labels = None
depends_on = None
restore_condor_content = """executable = restore_cms.sh
arguments = {{product_locator}} {{cal_product_locator}} {{request_id}} metadata.json PPR.xml
output = restore.out
error = restore.err
log = condor.log
SBIN_PATH = /lustre/aoc/cluster/pipeline/$ENV(CAPO_PROFILE)/workspaces/sbin
should_transfer_files = yes
transfer_input_files = $(SBIN_PATH)/.matplotlib, $(SBIN_PATH)/pycapo, $(SBIN_PATH)/framework.sh, $(SBIN_PATH)/productfetcher, $(SBIN_PATH)/casa_envoy, $(SBIN_PATH)/vela, $(SBIN_PATH)/deliver, ./PPR.xml, ./metadata.json
transfer_output_files = working, rawdata, products
request_memory = {{ramInGb}}
getenv = True
environment = "CAPO_PATH=/home/casa/capo"
queue
"""
restore_script_content = """#!/bin/sh
export HOME=$TMPDIR
set -o errexit
./framework.sh -d .
chmod 770 .
cd rawdata/
../productfetcher --product-locator $1 $2
cd ../
./casa_envoy --restore -c $4 $5
./deliver -p . --prefix $3
"""
def upgrade():
restore_steps = """prepare-and-run-workflow restore_cms
await-workflow
await-qa
"""
op.execute(
f"""
INSERT INTO capabilities (capability_name, capability_steps, max_jobs)
VALUES ('restore_cms', '{restore_steps}', 20)
"""
)
op.execute(
"""
INSERT INTO workflows (workflow_name) VALUES ('restore_cms')
"""
)
op.execute(
f"""
INSERT INTO workflow_templates (filename, content, workflow_name)
VALUES ('restore_cms.condor', E'{restore_condor_content}', 'restore_cms')
"""
)
op.execute(
f"""
INSERT INTO workflow_templates (filename, content, workflow_name)
VALUES ('restore_cms.sh', E'{restore_script_content}', 'restore_cms')
"""
)
def downgrade():
op.execute(
"""
DELETE FROM capabilities WHERE capability_name = 'restore_cms'
"""
)
op.execute(
"""
DELETE FROM workflows WHERE workflow_name = 'restore_cms'
"""
)
op.execute(
"""
DELETE FROM workflow_templates WHERE workflow_name = 'restore_cms'
"""
)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment