Skip to content
Snippets Groups Projects
Commit 92df985a authored by Charlotte Hausman's avatar Charlotte Hausman
Browse files

imaging changes for casa_envoy and vela

parent 4976178b
No related branches found
No related tags found
1 merge request!325imaging changes for casa_envoy and vela
This commit is part of merge request !325. Comments created here will be created in the context of that merge request.
Showing
with 830 additions and 216 deletions
import logging
import re
import glob
import sys
import os
import subprocess
from casa_envoy.interfaces import CasaLauncherIF
from casa_envoy.schema import AbstractTextFile
def get_base_environment(parameters: dict):
os.environ["SCIPIPE_ROOTDIR"] = parameters["rootDirectory"]
os.environ["CASA_HOME"] = parameters["homeForReprocessing"]
os.environ["LANG"] = "en_US.UTF-8"
def check_processing_env(logger: logging.Logger):
logger.info("Checking processing environment:")
env1 = os.environ.get("SCIPIPE_ROOTDIR")
logger.info(f"SCIPIPE_ROOTDIR: {env1}")
env2 = os.environ.get("CASA_HOME")
logger.info(f"CASA_HOME: {env2}")
env3 = os.environ.get("PPR_FILENAME")
logger.info(f"PPR_FILENAME: {env3}")
env4 = os.environ.get("LANG")
logger.info(f"LANG: {env4}")
if "None" in [env1, env2, env3, env4]:
logger.info("Environment setup Failed!")
sys.exit(1)
else:
logger.info("Environment ready for processing")
class CalibrationLauncher(CasaLauncherIF):
def __init__(self, ppr: AbstractTextFile, metadata: AbstractTextFile):
self.logger = logging.getLogger("casa_envoy")
self.ppr = ppr
self.metadata = metadata
def run(self):
self.logger.info("RUNNING CASA CALIBRATION!")
os.chdir("./working")
result = subprocess.Popen(
"PYTHONPATH='' xvfb-run -e ${PWD}/xvfb-run.err.txt -d -s \"-screen 0 800x600x16\" "
"${CASA_HOME}/bin/casa --pipeline --nogui --nologger -c "
"${CASA_HOME}/pipeline/pipeline/runvlapipeline.py ${PPR_FILENAME} || true",
shell=True,
executable="/bin/bash",
stdout=sys.stdout,
stderr=sys.stderr,
)
return result.communicate()
def setup_environment(self, parameters: dict):
get_base_environment(parameters)
os.environ["PPR_FILENAME"] = str(self.ppr)
check_processing_env(self.logger)
def check_logs(self, parent_path: str):
self.logger.info("CHECKING CASA CALIBRATION LOGS!")
# make sure we are in the correct directory to find log file
if not os.getcwd().endswith("/working"):
os.chdir(parent_path + "/working")
casa_logs = glob.glob("casa-*.log")
for file in casa_logs:
if re.match("^.*SEVERE\sflagmanager.*$", open(file).read()):
self.logger.error("CASA ERROR!")
else:
self.logger.info("CASA Success!")
class ImagingLauncher(CasaLauncherIF):
def __init__(self, ppr: AbstractTextFile, metadata: AbstractTextFile):
self.logger = logging.getLogger("casa_envoy")
self.ppr = ppr
self.metadata = metadata
def run(self):
self.logger.info("RUNNING CASA IMAGING!")
os.chdir("./working")
result = subprocess.Popen(
"PYTHONPATH='' xvfb-run -e ${PWD}/xvfb-run.err.txt -d -s \"-screen 0 800x600x16\" "
"${CASA_HOME}/bin/casa --pipeline --nogui --nologger -c "
"${CASA_HOME}/pipeline/pipeline/runvlapipeline.py ${PPR_FILENAME} || true",
shell=True,
executable="/bin/bash",
stdout=sys.stdout,
stderr=sys.stderr,
)
return result.communicate()
def setup_environment(self, parameters: dict):
get_base_environment(parameters)
os.environ["PPR_FILENAME"] = str(self.ppr)
check_processing_env(self.logger)
def check_logs(self, parent_path: str):
self.logger.info("CHECKING CASA IMAGING LOGS!")
# make sure we are in the correct directory to find log file
if not os.getcwd().endswith("/working"):
os.chdir(parent_path + "/working")
casa_logs = glob.glob("casa-*.log")
for file in casa_logs:
if re.match("^.*SEVERE\sflagmanager.*$", open(file).read()):
self.logger.error("CASA ERROR!")
else:
self.logger.info("CASA Success!")
......@@ -5,21 +5,15 @@ palaver definition: an unnecessarily elaborate or complex procedure
import argparse
import logging
import pathlib
import re
import subprocess
import glob
import sys
import os
import json
from typing import Dict
from casa_envoy.auditor import AuditDirectories, AuditFiles
from casa_envoy.interfaces import CasaLauncherIF
from casa_envoy.schema import AbstractTextFile
from pycapo import CapoConfig
import sys
import os
from casa_envoy.auditor import AuditDirectories, AuditFiles
from casa_envoy.launchers import CalibrationLauncher, ImagingLauncher
"""
Launch CASA processing jobs via Workspaces
......@@ -30,60 +24,11 @@ logger.setLevel(logging.INFO)
logger.addHandler(logging.StreamHandler(sys.stdout))
class CalibrationLauncher(CasaLauncherIF):
def __init__(self, ppr: AbstractTextFile, metadata: AbstractTextFile):
self.ppr = ppr
self.metadata = metadata
def run(self):
logger.info("RUNNING CASA!")
os.chdir("./working")
result = subprocess.Popen("PYTHONPATH='' xvfb-run -e ${PWD}/xvfb-run.err.txt -d -s \"-screen 0 800x600x16\" "
"${CASA_HOME}/bin/casa --pipeline --nogui --nologger -c "
"${CASA_HOME}/pipeline/pipeline/runvlapipeline.py ${PPR_FILENAME} || true",
shell=True, executable="/bin/bash", stdout=sys.stdout, stderr=sys.stderr)
return result.communicate()
def setup_environment(self, parameters: dict):
os.environ['SCIPIPE_ROOTDIR'] = parameters["rootDirectory"]
os.environ['CASA_HOME'] = parameters["homeForReprocessing"]
os.environ['PPR_FILENAME'] = str(self.ppr)
os.environ['LANG'] = "en_US.UTF-8"
logger.info("Checking processing environment:")
env1 = os.environ.get('SCIPIPE_ROOTDIR')
logger.info(f"SCIPIPE_ROOTDIR: {env1}")
env2 = os.environ.get('CASA_HOME')
logger.info(f"CASA_HOME: {env2}")
env3 = os.environ.get('PPR_FILENAME')
logger.info(f"PPR_FILENAME: {env3}")
env4 = os.environ.get('LANG')
logger.info(f"LANG: {env4}")
if 'None' in [env1, env2, env3, env4]:
logger.info("Environment setup Failed!")
sys.exit(1)
else:
logger.info("Environment ready for processing")
def check_logs(self, parent_path: str):
logger.info("CHECKING CASA LOGS!")
# make sure we are in the correct directory to find log file
if not os.getcwd().endswith("/working"):
os.chdir(parent_path + "/working")
casa_logs = glob.glob('casa-*.log')
for file in casa_logs:
if re.match("^.*SEVERE\sflagmanager.*$", open(file).read()):
logger.info("CASA ERROR!")
else:
logger.info("CASA Success!")
def _get_settings(cwd: pathlib.Path):
use_casa = CapoConfig().getboolean("edu.nrao.archive.workspaces.ProcessingSettings.useCasa")
casa_home = CapoConfig().settings("edu.nrao.archive.workflow.config.CasaVersions").homeForReprocessing
casa_home = (
CapoConfig().settings("edu.nrao.archive.workflow.config.CasaVersions").homeForReprocessing
)
root_dir = str(cwd.parent)
processing_dir = str(cwd.stem)
......@@ -92,29 +37,36 @@ def _get_settings(cwd: pathlib.Path):
"useCasa": use_casa,
"homeForReprocessing": casa_home,
"rootDirectory": root_dir,
"processingDirectory": processing_dir
"processingDirectory": processing_dir,
}
def arg_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(
description="Workspaces CASA processing launcher",
formatter_class=argparse.RawTextHelpFormatter
formatter_class=argparse.RawTextHelpFormatter,
)
parser.add_argument(
"--standard-cal",
nargs=2,
action="store",
required=False,
help="run the standard calibration CASA pipeline"
help="run the standard calibration CASA pipeline",
)
parser.add_argument(
"--standard-img",
nargs=2,
action="store",
required=False,
help="run the standard calibration CASA pipeline",
)
return parser
def check_calibratable(metadata_filename) -> bool:
with open(str(metadata_filename)) as json_meta:
def check_calibratable(metadata_filename: str) -> bool:
with open(metadata_filename) as json_meta:
metadata = json.loads(json_meta.read())
spl = metadata['productLocator']
spl = metadata["productLocator"]
if "execblock" in spl:
return True
else:
......@@ -122,6 +74,18 @@ def check_calibratable(metadata_filename) -> bool:
return False
def check_imagable(metadata_filename: str) -> bool:
with open(metadata_filename) as json_meta:
metadata = json.loads(json_meta.read())
cms_name = metadata["cmsName"]
cms_path = metadata["calibrationSourceDirectory"]
if cms_name is not None and cms_path is not None and cms_name[-3:] == ".ms":
return True
else:
logger.info("CMS ERROR: Imaging requires a valid CMS name and location!")
return False
def run_audit(ppr: str, metadata: str, settings: Dict[str, str]):
dir_audit = AuditDirectories(ppr, settings).audit()
if dir_audit:
......@@ -140,16 +104,16 @@ def run_audit(ppr: str, metadata: str, settings: Dict[str, str]):
def main():
args = arg_parser().parse_args()
metadata = args.standard_cal[0]
ppr = args.standard_cal[1]
if check_calibratable(metadata):
path = os.getcwd()
settings = _get_settings(pathlib.Path(path))
path = os.getcwd()
settings = _get_settings(pathlib.Path(path))
run_audit(ppr, metadata, settings)
if args.standard_cal is not None:
metadata = args.standard_cal[0]
ppr = args.standard_cal[1]
if args.standard_cal:
run_audit(ppr, metadata, settings)
if check_calibratable(metadata):
if settings.get("useCasa"):
launcher = CalibrationLauncher(ppr, metadata)
launcher.setup_environment(settings)
......@@ -160,6 +124,25 @@ def main():
else:
logger.info("RUNNING VELA!")
subprocess.run(["./vela", "--standard-cal", metadata, ppr])
else:
logger.info("TYPE ERROR: Provided SPL is not type execution block!")
sys.exit(1)
else:
logger.error("ERROR: Provided SPL is not type execution block!")
sys.exit(1)
elif args.standard_img is not None:
metadata = args.standard_img[0]
ppr = args.standard_img[1]
run_audit(ppr, metadata, settings)
if check_imagable(metadata):
if settings.get("useCasa"):
launcher = ImagingLauncher(ppr, metadata)
launcher.setup_environment(settings)
launcher.run()
launcher.check_logs(parent_path=path)
# return to parent directory after processing
os.chdir(path)
else:
logger.info("RUNNING VELA!")
subprocess.run(["./vela", "--standard-img", metadata, ppr])
else:
logger.error("ERROR: CMS information missing or incorrect!")
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<ns2:SciPipeRequest xmlns:ns2="Common/pipelinescience/SciPipeRequest">
<ProjectSummary>
<ProposalCode>VLA/null</ProposalCode>
<Observatory>NRAO</Observatory>
<Telescope>VLA</Telescope>
<ProcessingSite>Socorro</ProcessingSite>
<Operator>vlapipe</Operator>
<Mode>SCIENCE</Mode>
<Version>NGRH-ALMA-10_8</Version>
<CreationTime>2021-07-01T21:14:00</CreationTime>
</ProjectSummary>
<ProjectStructure>TBD</ProjectStructure>
<ProcessingRequests>
<RootDirectory>/lustre/aoc/cluster/pipeline/docker/workspaces/spool</RootDirectory>
<ProcessingRequest>
<ProcessingIntents>
<Intents>
<Keyword>VLA_INTERFEROMETRY_STANDARD_OBSERVING_MODE</Keyword>
<Value>Undefined</Value>
</Intents>
</ProcessingIntents>
<ProcessingProcedure>
<ProcedureTitle>hifv_contimage</ProcedureTitle>
<ProcessingCommand>
<Command xmlns="">hifv_importdata</Command>
<ParameterSet>
</ParameterSet>
</ProcessingCommand>
<ProcessingCommand>
<Command xmlns="">hif_mstransform</Command>
<ParameterSet>
</ParameterSet>
</ProcessingCommand>
<ProcessingCommand>
<Command xmlns="">hif_checkproductsize</Command>
<ParameterSet>
<Parameter>
<Keyword xmlns="">maximsize</Keyword>
<Value xmlns="">16384</Value>
</Parameter>
</ParameterSet>
</ProcessingCommand>
<ProcessingCommand>
<Command xmlns="">hif_makeimlist</Command>
<ParameterSet>
<Parameter>
<Keyword xmlns="">specmode</Keyword>
<Value xmlns="">cont</Value>
</Parameter>
</ParameterSet>
</ProcessingCommand>
<ProcessingCommand>
<Command xmlns="">hif_makeimages</Command>
<ParameterSet>
<Parameter>
<Keyword xmlns="">hm_masking</Keyword>
<Value xmlns="">none</Value>
</Parameter>
<Parameter>
<Keyword xmlns="">hm_cyclefactor</Keyword>
<Value xmlns="">3.0</Value>
</Parameter>
</ParameterSet>
</ProcessingCommand>
<ProcessingCommand>
<Command xmlns="">hifv_exportdata</Command>
<ParameterSet>
<Parameter>
<Keyword xmlns="">imaging_products_only</Keyword>
<Value xmlns="">True</Value>
</Parameter>
</ParameterSet>
</ProcessingCommand>
</ProcessingProcedure>
<DataSet>
<RelativePath>tmpo1ca1pp_</RelativePath>
<SdmIdentifier>brain_000.58099.67095825232.ms</SdmIdentifier>
<DataType>asdm</DataType>
</DataSet>
</ProcessingRequest>0
</ProcessingRequests>
<ResultsProcessing>
<ArchiveResults>false</ArchiveResults>
<CleanUpDisk>false</CleanUpDisk>
<UpdateProjectLifeCycle>false</UpdateProjectLifeCycle>
<NotifyOperatorWhenDone>false</NotifyOperatorWhenDone>
<SDMall>false</SDMall>
<SDMonly>false</SDMonly>
<PipelineOperatorAddress>Unknown</PipelineOperatorAddress>
</ResultsProcessing>
</ns2:SciPipeRequest>
{
"fileSetIds": "brain_000.58099.67095825232",
"workflowName": "std_cms_imaging",
"systemId": "4",
"productLocator": "uid://evla/execblock/ec082e65-452d-4fec-ad88-f5b4af1f9e36",
"projectMetadata": {
"projectCode": "Operations",
"title": "",
"startTime": "58099.6710792824",
"observer": "VLA Operations"
},
"destinationDirectory": "/lustre/aoc/cluster/pipeline/docker/workspaces/spool/tmpo1ca1pp_",
"calibrationSourceDirectory":"/lustre/aoc/cluster/pipeline/docker/workspaces/spool/tmpowrvygml/working",
"cmsName":"brain_000.58099.67095825232.ms"
}
"""
Tests for casa_envoy.palaver
"""
import os
from unittest.mock import mock_open, patch
from casa_envoy.palaver import (
CalibrationLauncher,
check_calibratable,
)
class TestCalibrationLauncher:
@patch("os.chdir")
def test_run(self, mock_os):
with patch("subprocess.Popen") as sp:
CalibrationLauncher("test/PPR.xml", "test/test.json").run()
sp.call_count == 1
def test_setup_environment(self):
parameters = {
"useCasa": False,
"homeForReprocessing": "/home/casa/packages/pipeline/current",
"rootDirectory": "/tmp/workspaces_tmp/",
"processingDirectory": "tmpiox5trbp"
}
CalibrationLauncher("PPR.xml", "test.json").setup_environment(parameters=parameters)
assert os.environ.get("SCIPIPE_ROOTDIR") == parameters["rootDirectory"]
assert os.environ.get("CASA_HOME") == parameters["homeForReprocessing"]
assert os.environ.get("PPR_FILENAME") == "PPR.xml"
@patch("os.chdir")
@patch("os.getcwd")
def test_check_logs(self, mock_os_cwd, mock_os_dir):
with patch("builtins.open", mock_open()) as o:
CalibrationLauncher("PPR.xml", "test.json").check_logs(parent_path=".")
o.call_count == 1
import argparse
from unittest.mock import patch, MagicMock
from casa_envoy.palaver import check_calibratable, check_imagable, _get_settings
expected_settings = {
"useCasa": False,
"homeForReprocessing": "/home/casa/packages/pipeline/current",
"rootDirectory": "/lustre/aoc/cluster/pipeline/docker/workspaces/spool",
"processingDirectory": "tmpo1ca1pp_",
}
args = argparse.Namespace()
def test_check_calibratable():
check = check_calibratable("test/test.json")
assert check == True
assert check is True
def test_check_imagable():
check = check_imagable("test/image-metadata.json")
assert check is True
def test_get_settings():
with patch(
"pathlib.Path.cwd",
MagicMock(return_value="/lustre/aoc/cluster/pipeline/docker/workspaces/spool/tmpo1ca1pp_"),
) as cwd:
settings = _get_settings(cwd)
assert settings["useCasa"] == expected_settings["useCasa"]
assert settings["homeForReprocessing"] == expected_settings["homeForReprocessing"]
import os
from unittest.mock import mock_open, patch
from casa_envoy.launchers import CalibrationLauncher, ImagingLauncher
class TestCalibrationLauncher:
@patch("subprocess.Popen")
@patch("os.chdir")
def test_run(self, mock_os, mock_subprocess):
CalibrationLauncher("test/PPR.xml", "test/test.json").run()
assert mock_subprocess.call_count == 1
def test_setup_environment(self):
parameters = {
"useCasa": False,
"homeForReprocessing": "/home/casa/packages/pipeline/current",
"rootDirectory": "/tmp/workspaces_tmp/",
"processingDirectory": "tmpiox5trbp",
}
CalibrationLauncher("PPR.xml", "test.json").setup_environment(parameters=parameters)
assert os.environ.get("SCIPIPE_ROOTDIR") == parameters["rootDirectory"]
assert os.environ.get("CASA_HOME") == parameters["homeForReprocessing"]
assert os.environ.get("PPR_FILENAME") == "PPR.xml"
@patch("builtins.open")
@patch("glob.glob")
@patch("os.chdir")
@patch("os.getcwd")
def test_check_logs(self, mock_os_cwd, mock_os_dir, mock_glob, mock_open):
CalibrationLauncher("PPR.xml", "test.json").check_logs(parent_path=".")
assert mock_os_cwd.call_count == 1
assert mock_os_dir.call_count == 0
assert mock_glob.call_count == 1
class TestImagingLauncher:
@patch("os.chdir")
def test_run(self, mock_os):
with patch("subprocess.Popen") as sp:
ImagingLauncher("test/cmsimage-PPR.xml", "test/image-metadata.json").run()
assert sp.call_count == 1
def test_setup_environment(self):
parameters = {
"useCasa": False,
"homeForReprocessing": "/home/casa/packages/pipeline/current",
"rootDirectory": "/tmp/workspaces_tmp/",
"processingDirectory": "tmpiox5trbp",
}
ppr = "cmsimage-PPR.xml"
metadata = "image-metadata.json"
ImagingLauncher(ppr, metadata).setup_environment(parameters=parameters)
assert os.environ.get("SCIPIPE_ROOTDIR") == parameters["rootDirectory"]
assert os.environ.get("CASA_HOME") == parameters["homeForReprocessing"]
assert os.environ.get("PPR_FILENAME") == ppr
@patch("builtins.open")
@patch("glob.glob")
@patch("os.chdir")
@patch("os.getcwd")
def test_check_logs(self, mock_os_cwd, mock_os_dir, mock_glob, mock_open):
ppr = "cmsimage-PPR.xml"
metadata = "image-metadata.json"
ImagingLauncher(ppr, metadata).check_logs(parent_path=".")
assert mock_os_cwd.call_count == 1
assert mock_os_dir.call_count == 0
assert mock_glob.call_count == 1
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<ns2:SciPipeRequest xmlns:ns2="Common/pipelinescience/SciPipeRequest">
<ProjectSummary>
<ProposalCode>VLA/null</ProposalCode>
<Observatory>NRAO</Observatory>
<Telescope>VLA</Telescope>
<ProcessingSite>Socorro</ProcessingSite>
<Operator>vlapipe</Operator>
<Mode>SCIENCE</Mode>
<Version>NGRH-ALMA-10_8</Version>
<CreationTime>2021-07-01T21:14:00</CreationTime>
</ProjectSummary>
<ProjectStructure>TBD</ProjectStructure>
<ProcessingRequests>
<RootDirectory>/lustre/aoc/cluster/pipeline/docker/workspaces/spool</RootDirectory>
<ProcessingRequest>
<ProcessingIntents>
<Intents>
<Keyword>VLA_INTERFEROMETRY_STANDARD_OBSERVING_MODE</Keyword>
<Value>Undefined</Value>
</Intents>
</ProcessingIntents>
<ProcessingProcedure>
<ProcedureTitle>hifv_contimage</ProcedureTitle>
<ProcessingCommand>
<Command xmlns="">hifv_importdata</Command>
<ParameterSet>
</ParameterSet>
</ProcessingCommand>
<ProcessingCommand>
<Command xmlns="">hif_mstransform</Command>
<ParameterSet>
</ParameterSet>
</ProcessingCommand>
<ProcessingCommand>
<Command xmlns="">hif_checkproductsize</Command>
<ParameterSet>
<Parameter>
<Keyword xmlns="">maximsize</Keyword>
<Value xmlns="">16384</Value>
</Parameter>
</ParameterSet>
</ProcessingCommand>
<ProcessingCommand>
<Command xmlns="">hif_makeimlist</Command>
<ParameterSet>
<Parameter>
<Keyword xmlns="">specmode</Keyword>
<Value xmlns="">cont</Value>
</Parameter>
</ParameterSet>
</ProcessingCommand>
<ProcessingCommand>
<Command xmlns="">hif_makeimages</Command>
<ParameterSet>
<Parameter>
<Keyword xmlns="">hm_masking</Keyword>
<Value xmlns="">none</Value>
</Parameter>
<Parameter>
<Keyword xmlns="">hm_cyclefactor</Keyword>
<Value xmlns="">3.0</Value>
</Parameter>
</ParameterSet>
</ProcessingCommand>
<ProcessingCommand>
<Command xmlns="">hifv_exportdata</Command>
<ParameterSet>
<Parameter>
<Keyword xmlns="">imaging_products_only</Keyword>
<Value xmlns="">True</Value>
</Parameter>
</ParameterSet>
</ProcessingCommand>
</ProcessingProcedure>
<DataSet>
<RelativePath>tmpo1ca1pp_</RelativePath>
<SdmIdentifier>brain_000.58099.67095825232.ms</SdmIdentifier>
<DataType>asdm</DataType>
</DataSet>
</ProcessingRequest>0
</ProcessingRequests>
<ResultsProcessing>
<ArchiveResults>false</ArchiveResults>
<CleanUpDisk>false</CleanUpDisk>
<UpdateProjectLifeCycle>false</UpdateProjectLifeCycle>
<NotifyOperatorWhenDone>false</NotifyOperatorWhenDone>
<SDMall>false</SDMall>
<SDMonly>false</SDMonly>
<PipelineOperatorAddress>Unknown</PipelineOperatorAddress>
</ResultsProcessing>
</ns2:SciPipeRequest>
{
"fileSetIds": "brain_000.58099.67095825232",
"workflowName": "std_cms_imaging",
"systemId": "4",
"productLocator": "uid://evla/execblock/ec082e65-452d-4fec-ad88-f5b4af1f9e36",
"projectMetadata": {
"projectCode": "Operations",
"title": "",
"startTime": "58099.6710792824",
"observer": "VLA Operations"
},
"destinationDirectory": "/lustre/aoc/cluster/pipeline/docker/workspaces/spool/tmpo1ca1pp_",
"calibrationSourceDirectory":"/lustre/aoc/cluster/pipeline/docker/workspaces/spool/tmpowrvygml/working",
"cmsName":"brain_000.58099.67095825232.ms"
}
"""
Tests for vela.emulators
"""
import os
from unittest.mock import patch, mock_open
import pytest
from vela.emulators import CalibrationEmulator, ImagingEmulator
from vela.forger import forge
class TestCalibrationEmulator:
@pytest.mark.skip("Ignores forger mock.")
@patch("os.chdir")
@patch("vela.forger.VelaProduct.forge_products")
@patch("vela.forger.VelaLog.forge_logs")
def test_run(self, mock_logs, mock_products, mock_os):
with patch("vela.forger.forge") as mock_forge:
CalibrationEmulator("test/test.json", "test/PPR.xml").run()
assert mock_forge.call_count == 1
def test_setup_environment(self):
parameters = {
"useCasa": False,
"homeForReprocessing": "/home/casa/packages/pipeline/current",
"rootDirectory": "/tmp/workspaces_tmp/",
}
CalibrationEmulator("test.json", "PPR.xml").setup_environment(parameters=parameters)
assert os.environ.get("SCIPIPE_ROOTDIR") == parameters["rootDirectory"]
assert os.environ.get("CASA_HOME") == parameters["homeForReprocessing"]
assert os.environ.get("PPR_FILENAME") == "PPR.xml"
@patch("glob.glob")
@patch("os.chdir")
def test_check_logs(self, mock_os, mock_glob):
with patch("builtins.open", mock_open()) as o:
CalibrationEmulator("test.json", "PPR.xml").check_logs(parent_path=mock_os)
assert o.call_count == 0
assert mock_os.call_count == 1
assert mock_glob.call_count == 1
class TestImagingEmulator:
@pytest.mark.skip("Ignores forger mock.")
@patch("os.chdir")
@patch("vela.forger.VelaProduct.forge_products")
@patch("vela.forger.VelaLog.forge_logs")
def test_run(self, mock_logs, mock_products, mock_os):
with patch("vela.forger.forge") as mock_forge:
ImagingEmulator("test/test.json", "test/PPR.xml").run()
assert mock_forge.call_count == 1
def test_setup_environment(self):
parameters = {
"useCasa": False,
"homeForReprocessing": "/home/casa/packages/pipeline/current",
"rootDirectory": "/tmp/workspaces_tmp/",
}
ImagingEmulator("test.json", "PPR.xml").setup_environment(parameters=parameters)
assert os.environ.get("SCIPIPE_ROOTDIR") == parameters["rootDirectory"]
assert os.environ.get("CASA_HOME") == parameters["homeForReprocessing"]
assert os.environ.get("PPR_FILENAME") == "PPR.xml"
@patch("glob.glob")
@patch("os.chdir")
def test_check_logs(self, mock_os, mock_glob):
with patch("builtins.open", mock_open()) as o:
ImagingEmulator("test.json", "PPR.xml").check_logs(parent_path=mock_os)
assert o.call_count == 0
assert mock_os.call_count == 1
assert mock_glob.call_count == 1
......@@ -6,28 +6,71 @@ from vela.forger import read_metadata, VelaProduct, VelaLog
class TestVelaLog:
def test_forge_logs(self):
parameters_cal = read_metadata("test/test.json")
with patch("vela.forger.VelaLog.cal_logs") as cal_logs:
VelaLog("test.json").forge_logs(parameters=parameters_cal)
assert cal_logs.call_count == 1
parameters_img = read_metadata("test/image-metadata.json")
with patch("vela.forger.VelaLog.img_logs") as img_logs:
VelaLog("image-metadata.json").forge_logs(parameters=parameters_img)
assert img_logs.call_count == 1
@patch("os.chdir")
@patch("pendulum.now")
def test_forge_logs(self, mock_pendulum, mock_os):
parameters = read_metadata("test/test.json")
def test_cal_logs(self, mock_pendulum, mock_os):
with patch("builtins.open", mock_open()) as o:
VelaLog("test.json").forge_logs(parameters=parameters)
o.call_count == 3
VelaLog("test.json").cal_logs()
assert o.call_count == 3
handle = o()
handle.write.call_count == 3
assert handle.write.call_count == 3
@patch("os.chdir")
@patch("pendulum.now")
def test_img_logs(self, mock_pendulum, mock_os):
with patch("builtins.open", mock_open()) as o:
VelaLog("image-metadata.json").img_logs()
assert o.call_count == 3
handle = o()
assert handle.write.call_count == 3
class TestVelaProduct:
def test_forge_products(self):
parameters_cal = read_metadata("test/test.json")
with patch("vela.forger.VelaProduct.cal_products") as cal_products:
VelaProduct("test.json").forge_products(parameters=parameters_cal)
assert cal_products.call_count == 1
parameters_img = read_metadata("test/image-metadata.json")
with patch("vela.forger.VelaProduct.img_products") as img_products:
VelaProduct("image-metadata.json").forge_products(parameters=parameters_img)
assert img_products.call_count == 1
@patch("os.chdir")
@patch("vela.forger.VelaProduct.forge_weblog")
def test_forge_cal_products(self, mock_forge, mock_os):
@patch("vela.forger.VelaProduct.forge_measurement_set")
def test_cal_products(self, mock_ms, mock_forge, mock_os):
parameters = read_metadata("test/test.json")
with patch("builtins.open", mock_open()) as o:
VelaProduct("test.json").forge_cal_products(parameters=parameters)
VelaProduct("test.json").cal_products(parameters=parameters)
o.assert_called_once_with("brain_001.58099.678886747686.ms.calapply.txt", "x")
handle = o()
handle.write.assert_called_once_with("I am a calibration file.")
@patch("os.mkdir")
@patch("os.chdir")
def test_forge_measurement_set(self, mock_chdir, mock_mkdir):
with patch("builtins.open", mock_open()) as o:
VelaProduct("test.json").forge_measurement_set(sdmId="brain_001.58099.678886747686")
assert o.call_count == 5
handle = o()
assert handle.write.call_count == 5
assert mock_mkdir.call_count == 6
assert mock_chdir.call_count == 2
@patch("os.mkdir")
@patch("os.chdir")
@patch("pendulum.now")
......@@ -35,9 +78,9 @@ class TestVelaProduct:
def test_forge_weblog(self, mock_tar, mock_pendulum, mock_os_chdir, mock_os_mkdir):
with patch("builtins.open", mock_open()) as o:
VelaProduct("test.json").forge_weblog(path="/tmp/workspaces_tmp/testing")
o.call_count == 3
assert o.call_count == 3
handle = o()
handle.write.call_count == 3
assert handle.write.call_count == 3
def test_read_metadata():
......@@ -45,10 +88,11 @@ def test_read_metadata():
assert parameters["fileSetIds"] == "brain_001.58099.678886747686"
assert parameters["workflowName"] == "std_calibration"
assert parameters["systemId"] == "5"
assert parameters["productLocator"] == "uid://evla/execblock/9cb0964b-ad6b-40ed-bd87-d08c502503e2"
assert (
parameters["productLocator"] == "uid://evla/execblock/9cb0964b-ad6b-40ed-bd87-d08c502503e2"
)
project = parameters.get("projectMetadata")
assert project["projectCode"] == "Operations"
assert project["title"] == ""
assert project["startTime"] == "58099.6790081019"
assert project["observer"] == "VLA Operations"
"""
Tests for Vela
"""
import os
from unittest.mock import patch, mock_open
import argparse
from unittest.mock import patch, MagicMock
from vela.quasar import CalibrationEmulator
import vela.quasar as quasar
expected_settings = {
"useCasa": False,
"homeForReprocessing": "/home/casa/packages/pipeline/current",
"rootDirectory": "/lustre/aoc/cluster/pipeline/docker/workspaces/spool",
"processingDirectory": "tmpabcd1234",
}
args = argparse.Namespace()
class TestQuasar:
def test_get_settings(self):
with patch(
"pathlib.Path.cwd",
MagicMock(
return_value="/lustre/aoc/cluster/pipeline/docker/workspaces/spool/tmpabcd1234"
),
) as cwd:
settings = quasar._get_settings(cwd)
assert settings["useCasa"] == expected_settings["useCasa"]
assert settings["homeForReprocessing"] == expected_settings["homeForReprocessing"]
class TestCalibrationEmulator:
@patch("os.chdir")
@patch("vela.forger.VelaProduct.forge_cal_products")
@patch("vela.forger.VelaLog.forge_logs")
def test_run(self, mock_logs, mock_products, mock_os):
with patch("vela.forger.forge") as forge:
CalibrationEmulator("test/test.json", "test/PPR.xml").run()
forge.call_count == 1
def test_setup_environment(self):
parameters = {"useCasa": False,
"homeForReprocessing": "/home/casa/packages/pipeline/current",
"rootDirectory": "/tmp/workspaces_tmp/"}
CalibrationEmulator("test.json", "PPR.xml").setup_environment(parameters=parameters)
assert os.environ.get('SCIPIPE_ROOTDIR') == parameters["rootDirectory"]
assert os.environ.get('CASA_HOME') == parameters["homeForReprocessing"]
assert os.environ.get('PPR_FILENAME') == "PPR.xml"
@patch("pathlib.Path")
@patch("os.getcwd")
def test_main_stdcal(self, mock_cwd, mock_path, mock_chdir):
args.standard_cal = ["test/test.json", "test/PPR.xml"]
with patch("argparse.ArgumentParser.parse_args", MagicMock(return_value=args)) as mock_args:
with patch("vela.emulators.CalibrationEmulator.run") as run:
quasar.main()
assert run.call_count == 1
# reset for testing
args.standard_cal = None
@patch("os.chdir")
def test_check_logs(self, mock_os):
with patch("builtins.open", mock_open()) as o:
CalibrationEmulator("test.json", "PPR.xml").check_logs(parent_path=mock_os)
o.call_count == 1
@patch("pathlib.Path")
@patch("os.getcwd")
def test_main_stdimg(self, mock_cwd, mock_path, mock_chdir):
args.standard_img = ["test/image-metadata.json", "test/cmsimage-PPR.xml"]
with patch("argparse.ArgumentParser.parse_args", MagicMock(return_value=args)) as mock_args:
with patch("vela.emulators.ImagingEmulator.run") as run:
quasar.main()
assert run.call_count == 1
# reset for testing
args.standard_img = None
import glob
import logging
import os
import re
import sys
from vela.forger import forge
from vela.interfaces import CasaEmulatorIF
"""
Emulate a CASA run
"""
def get_base_environment(parameters: dict):
os.environ["SCIPIPE_ROOTDIR"] = parameters["rootDirectory"]
os.environ["CASA_HOME"] = parameters["homeForReprocessing"]
os.environ["LANG"] = "en_US.UTF-8"
def check_processing_env(logger: logging.Logger):
logger.info("Checking processing environment:")
env1 = os.environ.get("SCIPIPE_ROOTDIR")
logger.info(f"SCIPIPE_ROOTDIR: {env1}")
env2 = os.environ.get("CASA_HOME")
logger.info(f"CASA_HOME: {env2}")
env3 = os.environ.get("PPR_FILENAME")
logger.info(f"PPR_FILENAME: {env3}")
env4 = os.environ.get("LANG")
logger.info(f"LANG: {env4}")
if "None" in [env1, env2, env3, env4]:
logger.info("Environment setup Failed!")
sys.exit(1)
else:
logger.info("Environment ready for processing")
class CalibrationEmulator(CasaEmulatorIF):
def __init__(self, metadata: str, ppr: str):
self.logger = logging.getLogger("vela")
self.ppr = ppr
self.metadata = metadata
def run(self):
self.logger.info("Vela waiting on the Forger....")
forge(self.metadata)
def setup_environment(self, parameters: dict):
get_base_environment(parameters)
os.environ["PPR_FILENAME"] = str(self.ppr)
check_processing_env(self.logger)
def check_logs(self, parent_path: str):
self.logger.info("CHECKING VELA LOGS!")
if not os.getcwd().endswith("/working"):
os.chdir(parent_path + "/working")
vela_logs = glob.glob("vela-*.log")
for file in vela_logs:
if re.match("^.*SEVERE\sflagmanager.*$", open(file).read()):
self.logger.info("VELA ERROR! Please inspect logs.")
else:
self.logger.info("VELA Success!")
class ImagingEmulator(CasaEmulatorIF):
def __init__(self, metadata: str, ppr: str):
self.logger = logging.getLogger("vela")
self.metadata = metadata
self.ppr = ppr
def run(self):
self.logger.info("Vela waiting on the Forger....")
forge(self.metadata)
def setup_environment(self, parameters: dict):
get_base_environment(parameters)
os.environ["PPR_FILENAME"] = str(self.ppr)
check_processing_env(self.logger)
def check_logs(self, parent_path: str):
self.logger.info("CHECKING VELA LOGS!")
if not os.getcwd().endswith("/working"):
os.chdir(parent_path + "/working")
vela_logs = glob.glob("vela-*.log")
for file in vela_logs:
if re.match("^.*SEVERE\sflagmanager.*$", open(file).read()):
self.logger.info("VELA ERROR! Please inspect logs.")
else:
self.logger.info("VELA Success!")
......@@ -23,6 +23,14 @@ class VelaLog:
# vela-pipeline.sh.err.txt
# vela-<date_as_string>-<timestring>.log
workflow_name = parameters["workflowName"]
if "calibration" in workflow_name:
self.cal_logs()
elif "imaging" in workflow_name:
self.img_logs()
def cal_logs(self):
path = Path.cwd() / "working"
os.chdir(path)
......@@ -35,7 +43,7 @@ class VelaLog:
pipe_err.write(forged_content())
pipe_err.close()
datestring = pendulum.now().format('YYYYMMDD')
datestring = pendulum.now().format("YYYYMMDD")
timestring = pendulum.now().utcnow().format("hhmmss")
log = open(f"vela-{datestring}-{timestring}.log", "w")
......@@ -45,13 +53,25 @@ class VelaLog:
self.logger.info("Logs forging complete.")
os.chdir("../")
def img_logs(self):
# TODO: find out if logs are different for imaging
self.cal_logs()
class VelaProduct:
def __init__(self, metadata: str):
self.logger = logging.getLogger("vela")
self.metadata = metadata
def forge_cal_products(self, parameters: dict):
def forge_products(self, parameters: dict):
workflow_name = parameters["workflowName"]
if "calibration" in workflow_name:
self.cal_products(parameters)
elif "imaging" in workflow_name:
self.img_products(parameters)
def cal_products(self, parameters: dict):
# forge products so there is something to test delivery with
path = Path.cwd() / "products"
......@@ -65,16 +85,37 @@ class VelaProduct:
calapply.close()
os.chdir("../working")
self.forge_measurement_set(sdm_id)
self.forge_weblog(os.getcwd())
self.logger.info("Forging products complete.")
os.chdir("../")
def img_products(self, parameters: dict):
# TODO: find out products to forge
self.forge_measurement_set(parameters["fileSetIds"])
def forge_measurement_set(self, sdmId: str):
self.logger.info("Forging Measurement Set....")
cms_name = sdmId + ".ms"
os.mkdir(cms_name)
os.chdir("./" + cms_name)
for i in range(0, 5):
dirname = f"FORGED_MS_DIR_{i}"
os.mkdir(dirname)
file = open(f"table.f{i}", "x")
file.write("I am a table.")
file.close()
os.chdir("../")
self.logger.info("MS forging complete.")
def forge_weblog(self, path):
self.logger.info("Forging weblog...")
datestring = pendulum.now().format('YYYYMMDD')
timestring = pendulum.now().utcnow().format('hhmmss')
datestring = pendulum.now().format("YYYYMMDD")
timestring = pendulum.now().utcnow().format("hhmmss")
dirname = f"pipeline-{datestring}-{timestring}"
os.mkdir(dirname)
os.chdir(path + "/" + dirname)
......@@ -118,4 +159,4 @@ def forge(metadata: str):
fake_logs.forge_logs(parameters)
fake_products = VelaProduct(metadata)
fake_products.forge_cal_products(parameters)
fake_products.forge_products(parameters)
......@@ -9,11 +9,12 @@ class CasaEmulatorIF(ABC):
Generic CASA Emulator methods for Vela.
Should be implemented for any type of CASA emulated processing.
"""
def run(self):
raise NotImplementedError
def setup_environment(self, parameters: dict):
raise NotImplementedError
def check_logs(self):
def check_logs(self, parent_path: str):
raise NotImplementedError
......@@ -8,13 +8,10 @@ import os
import pathlib
import sys
import glob
import re
from pycapo import CapoConfig
from .interfaces import CasaEmulatorIF
from .forger import forge
from vela.emulators import CalibrationEmulator
from vela.emulators import ImagingEmulator
"""
The Vela system allows for testing Workspaces workflows without submitting to CASA on the cluster.
......@@ -25,54 +22,11 @@ logger.setLevel(logging.INFO)
logger.addHandler(logging.StreamHandler(sys.stdout))
class CalibrationEmulator(CasaEmulatorIF):
def __init__(self, metadata: str, ppr: str):
self.ppr = ppr
self.metadata = metadata
def run(self):
logger.info("Vela waiting on the Forger....")
forge(self.metadata)
def setup_environment(self, parameters: dict):
os.environ['SCIPIPE_ROOTDIR'] = parameters["rootDirectory"]
os.environ['CASA_HOME'] = parameters["homeForReprocessing"]
os.environ['PPR_FILENAME'] = str(self.ppr)
os.environ['LANG'] = "en_US.UTF-8"
logger.info("Checking processing environment:")
env1 = os.environ.get('SCIPIPE_ROOTDIR')
logger.info(f"SCIPIPE_ROOTDIR: {env1}")
env2 = os.environ.get('CASA_HOME')
logger.info(f"CASA_HOME: {env2}")
env3 = os.environ.get('PPR_FILENAME')
logger.info(f"PPR_FILENAME: {env3}")
env4 = os.environ.get('LANG')
logger.info(f"LANG: {env4}")
if 'None' in [env1, env2, env3, env4]:
logger.info("Environment setup Failed!")
sys.exit(1)
else:
logger.info("Environment ready for processing")
def check_logs(self, parent_path: str):
logger.info("CHECKING VELA LOGS!")
if not os.getcwd().endswith("/working"):
os.chdir(parent_path + "/working")
vela_logs = glob.glob('vela-*.log')
for file in vela_logs:
if re.match("^.*SEVERE\sflagmanager.*$", open(file).read()):
logger.info("VELA ERROR! Please inspect logs.")
else:
logger.info("VELA Success!")
def _get_settings(cwd: pathlib.Path):
use_casa = CapoConfig().getboolean("edu.nrao.archive.workspaces.ProcessingSettings.useCasa")
casa_home = CapoConfig().settings("edu.nrao.archive.workflow.config.CasaVersions").homeForReprocessing
casa_home = (
CapoConfig().settings("edu.nrao.archive.workflow.config.CasaVersions").homeForReprocessing
)
root_dir = str(cwd.parent)
processing_dir = str(cwd.stem)
......@@ -81,44 +35,49 @@ def _get_settings(cwd: pathlib.Path):
"useCasa": use_casa,
"homeForReprocessing": casa_home,
"rootDirectory": root_dir,
"processingDirectory": processing_dir
"processingDirectory": processing_dir,
}
def parser() -> argparse.ArgumentParser:
arg_parser = argparse.ArgumentParser(
description="Workspaces VELA processing launcher",
formatter_class=argparse.RawTextHelpFormatter
formatter_class=argparse.RawTextHelpFormatter,
)
arg_parser.add_argument(
"--standard-cal",
nargs=2,
action="store",
required=False,
help="run the standard calibration pipeline"
help="run the standard calibration pipeline",
)
arg_parser.add_argument(
"--standard-image",
"--standard-img",
nargs=2,
action="store",
required=False,
help="run the standard imaging pipeline"
help="run the standard imaging pipeline",
)
return arg_parser
def main():
args = parser().parse_args()
metadata = args.standard_cal[0]
ppr = args.standard_cal[1]
path = os.getcwd()
settings = _get_settings(pathlib.Path(path))
if args.standard_cal:
if args.standard_cal is not None:
metadata = args.standard_cal[0]
ppr = args.standard_cal[1]
emulator = CalibrationEmulator(metadata, ppr)
emulator.setup_environment(settings)
emulator.run()
emulator.check_logs(parent_path=path)
os.chdir(path)
elif args.standard_img is not None:
metadata = args.standard_img[0]
ppr = args.standard_img[1]
emulator = ImagingEmulator(metadata, ppr)
else:
logger.info("ARGUMENT ERROR: no valid argument was provided.")
emulator.setup_environment(settings)
emulator.run()
emulator.check_logs(parent_path=path)
os.chdir(path)
......@@ -173,6 +173,9 @@ class WorkflowService(WorkflowServiceIF):
# render all the templates
if request.argument["need_project_metadata"] is True:
templated_files = self._render_with_metadata(request, temp_folder, definition)
# catch aat_wrest failure and abort workflow
if isinstance(templated_files, WorkflowRequest):
return request
else:
templated_files = definition.render_templates(request.argument)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment