Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • ssa/workspaces
1 result
Show changes
Commits on Source (7)
Showing
with 1167 additions and 533 deletions
......@@ -14,49 +14,57 @@ from casa_envoy.interfaces import AuditorIF
from casa_envoy.schema import AbstractTextFile
def get_fields_for(filename: str) -> list:
metadata_list = ["fileSetIds", "workflowName", "systemId", "productLocator", "destinationDirectory"]
def get_fields_for(product_type: str, filename: str) -> list:
cal_metadata_list = [
"fileSetIds",
"workflowName",
"systemId",
"creationTime",
"productLocator",
"destinationDirectory",
]
img_metadata_list = [
"fileSetIds",
"workflowName",
"systemId",
"creationTime",
"productLocator",
"destinationDirectory",
"cms_path",
"sdmId",
]
ppr_list = ["RootDirectory", "RelativePath", "SdmIdentifier"]
if ".xml" in filename:
return ppr_list
if ".json" in filename:
return metadata_list
if ".json" in filename and "cal" in product_type:
return cal_metadata_list
elif ".json" in filename and "img" in product_type:
return img_metadata_list
def get_xml_content(filename: str):
with open(filename) as file:
return BeautifulSoup(file.read(), "xml")
def get_xml_content(file: AbstractTextFile):
return BeautifulSoup(file.content, "xml")
def get_value_for(filename: str, key: str) -> str:
if ".xml" in filename:
ppr_content = get_xml_content(filename)
def get_value_for(file: AbstractTextFile, key: str) -> str:
if ".xml" in file.filename:
ppr_content = get_xml_content(file)
return ppr_content.find(key).string
if ".json" in filename:
with open(filename) as file:
metadata = json.loads(file.read())
return metadata[key]
if ".json" in file.filename:
content = json.loads(file.content)
return content[key]
class AuditFiles(AuditorIF):
def __init__(self, files: List[str], settings: Dict[str, str]):
def __init__(self, files: List[AbstractTextFile], settings: Dict[str, str]):
self.logger = logging.getLogger("casa_envoy")
self.files = files
self.settings = settings
self.logger = logging.getLogger("casa_envoy")
def read_file(self, filename: str) -> AbstractTextFile:
if os.path.isfile(filename):
with open(filename) as file:
if ".json" in filename:
metadata = json.loads(file.read())
return AbstractTextFile(filename, json.dumps(metadata))
else:
if ".xml" in filename:
ppr = file.read()
return AbstractTextFile(filename, ppr)
self.product_type = settings.get("product_type")
def check_required_fields(self, file: AbstractTextFile, fields: list) -> bool:
missing = []
......@@ -83,7 +91,7 @@ class AuditFiles(AuditorIF):
shutil.copy(ppr.filename, "./working")
os.chdir("./working")
parsed_xml = get_xml_content(ppr.filename)
parsed_xml = get_xml_content(ppr)
parsed_xml.find("RootDirectory").string = self.settings["rootDirectory"]
parsed_xml.find("RelativePath").string = self.settings["processingDirectory"]
......@@ -97,15 +105,16 @@ class AuditFiles(AuditorIF):
invalid_files = []
for file in self.files:
self.logger.info(f"Auditing file {file}...")
f = self.read_file(file)
if f.filename == "PPR.xml":
if file.filename == "PPR.xml":
self.logger.info("Correcting PPR.xml for condor processing...")
f = self.correct_for_condor(f)
file = self.correct_for_condor(file)
valid = self.check_required_fields(file=f, fields=get_fields_for(f.filename))
valid = self.check_required_fields(
file=file, fields=get_fields_for(self.product_type, file.filename)
)
if not valid:
invalid_files.append(f.filename)
invalid_files.append(file.filename)
if len(invalid_files) != 0:
self.logger.info(f"INVALID FILE FOUND: {invalid_files}")
......@@ -115,7 +124,7 @@ class AuditFiles(AuditorIF):
class AuditDirectories(AuditorIF):
def __init__(self, ppr: str, settings: Dict[str, str]):
def __init__(self, ppr: AbstractTextFile, settings: Dict[str, str]):
self.logger = logging.getLogger("casa_envoy")
self.rootDirectory = settings["rootDirectory"]
self.relative_path = settings["processingDirectory"]
......@@ -142,4 +151,6 @@ class AuditDirectories(AuditorIF):
self.logger.info("FAILURE: data not found in rawdata/")
return False
else:
self.logger.info("DIRECTORY ERROR: A directory is missing from the processing root directory.")
self.logger.info(
"DIRECTORY ERROR: A directory is missing from the processing root directory."
)
"""
Interfaces for casa_envoy
"""
import abc
from abc import ABC
from typing import Dict
class CasaLauncherIF(ABC):
class LauncherIF(ABC):
"""
Generic CASA Launcher methods.
Generic Launcher methods.
Should be implemented for any type of CASA processing.
"""
def run(self):
raise NotImplementedError
def setup_environment(self, parameters: dict):
raise NotImplementedError
@abc.abstractmethod
def launch_casa(self):
pass
def check_logs(self, parent_path: str):
raise NotImplementedError
@abc.abstractmethod
def run_audit(self, parameters: Dict[str, str]):
pass
class AuditorIF(ABC):
"""
Generic functionality implementation for auditor classes
"""
def audit(self):
raise NotImplementedError
......@@ -4,106 +4,84 @@ import glob
import sys
import os
import subprocess
from typing import Dict
from casa_envoy.interfaces import CasaLauncherIF
from casa_envoy.schema import AbstractTextFile
import json
from casa_envoy.auditor import AuditFiles, AuditDirectories
from casa_envoy.interfaces import LauncherIF
from casa_envoy.schema import AbstractTextFile
def get_base_environment(parameters: dict):
os.environ["SCIPIPE_ROOTDIR"] = parameters["rootDirectory"]
os.environ["CASA_HOME"] = parameters["homeForReprocessing"]
os.environ["LANG"] = "en_US.UTF-8"
def get_abs_file(filename: str) -> AbstractTextFile:
with open(filename) as file:
if ".json" in filename:
content = json.loads(file.read())
elif ".xml" in filename:
content = file.read()
def check_processing_env(logger: logging.Logger):
logger.info("Checking processing environment:")
env1 = os.environ.get("SCIPIPE_ROOTDIR")
logger.info(f"SCIPIPE_ROOTDIR: {env1}")
env2 = os.environ.get("CASA_HOME")
logger.info(f"CASA_HOME: {env2}")
env3 = os.environ.get("PPR_FILENAME")
logger.info(f"PPR_FILENAME: {env3}")
env4 = os.environ.get("LANG")
logger.info(f"LANG: {env4}")
if "None" in [env1, env2, env3, env4]:
logger.info("Environment setup Failed!")
sys.exit(1)
else:
logger.info("Environment ready for processing")
return AbstractTextFile(filename, content)
class CalibrationLauncher(CasaLauncherIF):
def __init__(self, ppr: AbstractTextFile, metadata: AbstractTextFile):
class CasaLauncher:
def __init__(self, parameters: dict):
self.logger = logging.getLogger("casa_envoy")
self.ppr = ppr
self.metadata = metadata
self.parameters = parameters
def run(self):
self.logger.info("RUNNING CASA CALIBRATION!")
os.chdir("./working")
result = subprocess.Popen(
"PYTHONPATH='' xvfb-run -e ${PWD}/xvfb-run.err.txt -d -s \"-screen 0 800x600x16\" "
"${CASA_HOME}/bin/casa --pipeline --nogui --nologger -c "
"${CASA_HOME}/pipeline/pipeline/runvlapipeline.py ${PPR_FILENAME} || true",
shell=True,
executable="/bin/bash",
stdout=sys.stdout,
stderr=sys.stderr,
)
return result.communicate()
def setup_environment(self, parameters: dict):
get_base_environment(parameters)
os.environ["PPR_FILENAME"] = str(self.ppr)
check_processing_env(self.logger)
def check_logs(self, parent_path: str):
self.logger.info("CHECKING CASA CALIBRATION LOGS!")
# make sure we are in the correct directory to find log file
if not os.getcwd().endswith("/working"):
os.chdir(parent_path + "/working")
def setup_environment(self):
os.environ["SCIPIPE_ROOTDIR"] = self.parameters["rootDirectory"]
os.environ["CASA_HOME"] = self.parameters["homeForReprocessing"]
os.environ["LANG"] = "en_US.UTF-8"
os.environ["PPR_FILENAME"] = self.parameters["ppr"]
casa_logs = glob.glob("casa-*.log")
self.check_processing_env()
for file in casa_logs:
if re.match("^.*SEVERE\sflagmanager.*$", open(file).read()):
self.logger.error("CASA ERROR!")
else:
self.logger.info("CASA Success!")
def check_processing_env(self):
self.logger.info("Checking processing environment:")
env_list = ["SCIPIPE_ROOTDIR", "CASA_HOME", "PPR_FILENAME", "LANG"]
result_list = []
class ImagingLauncher(CasaLauncherIF):
def __init__(self, ppr: AbstractTextFile, metadata: AbstractTextFile):
self.logger = logging.getLogger("casa_envoy")
self.ppr = ppr
self.metadata = metadata
for var in env_list:
env = os.environ.get(var)
result_list.append(env)
self.logger.info(f"{var}: {env}")
if "None" in result_list:
self.logger.info("Environment setup Failed!")
sys.exit(1)
else:
self.logger.info("Environment ready for processing")
def run(self):
self.logger.info("RUNNING CASA IMAGING!")
os.chdir("./working")
result = subprocess.Popen(
"PYTHONPATH='' xvfb-run -e ${PWD}/xvfb-run.err.txt -d -s \"-screen 0 800x600x16\" "
"${CASA_HOME}/bin/casa --pipeline --nogui --nologger -c "
"${CASA_HOME}/pipeline/pipeline/runvlapipeline.py ${PPR_FILENAME} || true",
shell=True,
executable="/bin/bash",
stdout=sys.stdout,
stderr=sys.stderr,
)
return result.communicate()
def setup_environment(self, parameters: dict):
get_base_environment(parameters)
os.environ["PPR_FILENAME"] = str(self.ppr)
check_processing_env(self.logger)
def check_logs(self, parent_path: str):
self.logger.info("CHECKING CASA IMAGING LOGS!")
self.setup_environment()
if self.parameters.get("useCasa"):
self.logger.info("RUNNING CASA!")
os.chdir("./working")
result = subprocess.Popen(
"PYTHONPATH='' xvfb-run -e ${PWD}/xvfb-run.err.txt -d -s \"-screen 0 800x600x16\" "
"${CASA_HOME}/bin/casa --pipeline --nogui --nologger -c "
"${CASA_HOME}/pipeline/pipeline/runvlapipeline.py ${PPR_FILENAME} || true",
shell=True,
executable="/bin/bash",
stdout=sys.stdout,
stderr=sys.stderr,
)
return result.communicate()
else:
self.logger.info("RUNNING VELA!")
run_type = self.parameters.get("product_type")
metadata = self.parameters.get("metadata")
ppr = self.parameters.get("ppr")
subprocess.run(["./vela", run_type, metadata, ppr])
def check_logs(self):
self.logger.info("CHECKING CASA LOGS!")
# make sure we are in the correct directory to find log file
if not os.getcwd().endswith("/working"):
os.chdir(parent_path + "/working")
os.chdir(self.parameters.get("parent_path") + "/working")
casa_logs = glob.glob("casa-*.log")
......@@ -112,3 +90,83 @@ class ImagingLauncher(CasaLauncherIF):
self.logger.error("CASA ERROR!")
else:
self.logger.info("CASA Success!")
class CalibrationLauncher(LauncherIF):
def __init__(self, parameters: dict):
self.logger = logging.getLogger("casa_envoy")
self.parameters = parameters
self.ppr = get_abs_file(parameters.get("ppr"))
self.metadata = get_abs_file(parameters.get("metadata"))
def launch_casa(self):
if self.check_calibratable():
self.run_audit(self.parameters)
CasaLauncher(self.parameters).run()
else:
self.logger.error("ERROR: Provided SPL is not type execution block!")
sys.exit(1)
def check_calibratable(self) -> bool:
spl = self.metadata.content["productLocator"]
if "execblock" in spl:
return True
else:
self.logger.info("SPL ERROR: This product locator is not calibratable!")
return False
def run_audit(self, parameters: Dict[str, str]):
dir_audit = AuditDirectories(self.ppr, parameters).audit()
if dir_audit:
self.logger.info("Directory audit successful!")
else:
self.logger.error("FAILURE: directory structure audit was unsuccessful!")
sys.exit(1)
audit = AuditFiles([self.ppr, self.metadata], parameters).audit()
if audit:
self.logger.info("File audit successful!")
else:
self.logger.error("FAILURE: file audit was unsuccessful!")
sys.exit(1)
class ImagingLauncher(LauncherIF):
def __init__(self, parameters: dict):
self.logger = logging.getLogger("casa_envoy")
self.parameters = parameters
self.ppr = get_abs_file(parameters.get("ppr"))
self.metadata = get_abs_file(parameters.get("metadata"))
def launch_casa(self):
if self.check_imageable():
self.run_audit(self.parameters)
CasaLauncher(self.parameters).run()
else:
self.logger.error("ERROR: CMS information missing or incorrect!")
sys.exit(1)
def check_imageable(self) -> bool:
content = self.metadata.content
cms_name = content["cmsName"]
cms_path = content["calibrationSourceDirectory"]
if cms_name is not None and cms_path is not None and cms_name[-3:] == ".ms":
return True
else:
self.logger.info("CMS ERROR: Imaging requires a valid CMS name and location!")
return False
def run_audit(self, parameters: Dict[str, str]):
dir_audit = AuditDirectories(self.ppr, parameters).audit()
if dir_audit:
self.logger.info("Directory audit successful!")
else:
self.logger.info("FAILURE: directory structure audit was unsuccessful!")
sys.exit(1)
audit = AuditFiles([self.ppr, self.metadata], parameters).audit()
if audit:
self.logger.info("File audit successful!")
else:
self.logger.info("FAILURE: file audit was unsuccessful!")
sys.exit(1)
......@@ -5,14 +5,10 @@ palaver definition: an unnecessarily elaborate or complex procedure
import argparse
import logging
import pathlib
import subprocess
import sys
import os
import json
from typing import Dict
from pycapo import CapoConfig
from casa_envoy.auditor import AuditDirectories, AuditFiles
from casa_envoy.launchers import CalibrationLauncher, ImagingLauncher
"""
......@@ -24,20 +20,27 @@ logger.setLevel(logging.INFO)
logger.addHandler(logging.StreamHandler(sys.stdout))
def _get_settings(cwd: pathlib.Path):
def _get_settings(cwd: pathlib.Path, args: list):
use_casa = CapoConfig().getboolean("edu.nrao.archive.workspaces.ProcessingSettings.useCasa")
casa_home = (
CapoConfig().settings("edu.nrao.archive.workflow.config.CasaVersions").homeForReprocessing
)
parent_path = cwd
root_dir = str(cwd.parent)
processing_dir = str(cwd.stem)
metadata = args[0]
ppr = args[1]
return {
"useCasa": use_casa,
"homeForReprocessing": casa_home,
"rootDirectory": root_dir,
"processingDirectory": processing_dir,
"parent_path": parent_path,
"metadata": metadata,
"ppr": ppr,
}
......@@ -63,87 +66,23 @@ def arg_parser() -> argparse.ArgumentParser:
return parser
def check_calibratable(metadata_filename: str) -> bool:
with open(metadata_filename) as json_meta:
metadata = json.loads(json_meta.read())
spl = metadata["productLocator"]
if "execblock" in spl:
return True
else:
logger.info("SPL ERROR: This product locator is not calibratable!")
return False
def check_imagable(metadata_filename: str) -> bool:
with open(metadata_filename) as json_meta:
metadata = json.loads(json_meta.read())
cms_name = metadata["cmsName"]
cms_path = metadata["calibrationSourceDirectory"]
if cms_name is not None and cms_path is not None and cms_name[-3:] == ".ms":
return True
else:
logger.info("CMS ERROR: Imaging requires a valid CMS name and location!")
return False
def run_audit(ppr: str, metadata: str, settings: Dict[str, str]):
dir_audit = AuditDirectories(ppr, settings).audit()
if dir_audit:
logger.info("Directory audit successful!")
else:
logger.info("FAILURE: directory structure audit was unsuccessful!")
sys.exit(1)
audit = AuditFiles([ppr, metadata], settings).audit()
if audit:
logger.info("File audit successful!")
else:
logger.info("FAILURE: file audit was unsuccessful!")
sys.exit(1)
def main():
args = arg_parser().parse_args()
path = os.getcwd()
settings = _get_settings(pathlib.Path(path))
if args.standard_cal is not None:
metadata = args.standard_cal[0]
ppr = args.standard_cal[1]
run_audit(ppr, metadata, settings)
if check_calibratable(metadata):
if settings.get("useCasa"):
launcher = CalibrationLauncher(ppr, metadata)
launcher.setup_environment(settings)
launcher.run()
launcher.check_logs(parent_path=path)
# make sure we return to the parent directory after processing
os.chdir(path)
else:
logger.info("RUNNING VELA!")
subprocess.run(["./vela", "--standard-cal", metadata, ppr])
else:
logger.error("ERROR: Provided SPL is not type execution block!")
sys.exit(1)
parameters = _get_settings(pathlib.Path(path), args.standard_cal)
parameters["product_type"] = "standard-cal"
CalibrationLauncher(parameters).launch_casa()
# make sure we return to the parent directory after processing
os.chdir(path)
elif args.standard_img is not None:
metadata = args.standard_img[0]
ppr = args.standard_img[1]
run_audit(ppr, metadata, settings)
if check_imagable(metadata):
if settings.get("useCasa"):
launcher = ImagingLauncher(ppr, metadata)
launcher.setup_environment(settings)
launcher.run()
launcher.check_logs(parent_path=path)
# return to parent directory after processing
os.chdir(path)
else:
logger.info("RUNNING VELA!")
subprocess.run(["./vela", "--standard-img", metadata, ppr])
else:
logger.error("ERROR: CMS information missing or incorrect!")
sys.exit(1)
parameters = _get_settings(pathlib.Path(path), args.standard_img)
parameters["product_type"] = "standard-img"
ImagingLauncher(parameters).launch_casa()
# return to parent directory after processing
os.chdir(path)
......@@ -2,6 +2,7 @@
"fileSetIds": "brain_000.58099.67095825232",
"workflowName": "std_cms_imaging",
"systemId": "4",
"creationTime": "2021-07-01T21:14:00",
"productLocator": "uid://evla/execblock/ec082e65-452d-4fec-ad88-f5b4af1f9e36",
"projectMetadata": {
"projectCode": "Operations",
......
......@@ -2,6 +2,7 @@
"fileSetIds": "brain_001.58099.678886747686",
"workflowName": "std_calibration",
"systemId": "5",
"creationTime": "2021-05-06T17:34:36",
"productLocator": "uid://evla/execblock/9cb0964b-ad6b-40ed-bd87-d08c502503e2",
"projectMetadata": {
"projectCode": "Operations",
......@@ -10,4 +11,4 @@
"observer": "VLA Operations"
},
"destinationDirectory": "/tmp/workspaces_tmp/testing"
}
\ No newline at end of file
}
......@@ -13,104 +13,139 @@ from casa_envoy.auditor import (
)
from casa_envoy.schema import AbstractTextFile
settings = {
cal_settings = {
"useCasa": False,
"homeForReprocessing": "/home/casa/packages/pipeline/current",
"rootDirectory": "/tmp/workspaces_tmp/",
"processingDirectory": "tmpiox5trbp"
"processingDirectory": "tmpiox5trbp",
"metadata": "test/test.json",
"ppr": "test/PPR.xml",
"product_type": "standard-cal",
}
test_ppr = AbstractTextFile(filename="test/PPR.xml", content=Path("test/PPR.xml").read_text())
test_cal_metadata = AbstractTextFile(
filename="test/test.json", content=Path("test/test.json").read_text()
)
img_settings = {
"useCasa": False,
"homeForReprocessing": "/home/casa/packages/pipeline/current",
"rootDirectory": "/tmp/workspaces_tmp/",
"processingDirectory": "tmpiox5trbp",
"metadata": "test/image-metadata.json",
"ppr": "test/cmsimage-PPR.xml",
"product_type": "standard-img",
}
test_img_ppr = AbstractTextFile(
filename="test/cmsimage-PPR.xml", content=Path("test/cmsimage-PPR.xml").read_text()
)
test_img_metadata = AbstractTextFile(
filename="test/image-metadata.json", content=Path("test/image-metadata.json").read_text()
)
def test_get_fields_for():
fields = ["fileSetIds", "workflowName", "systemId", "productLocator", "destinationDirectory"]
result = get_fields_for("test/test.json")
product_type = "standard-cal"
fields = [
"fileSetIds",
"workflowName",
"systemId",
"creationTime",
"productLocator",
"destinationDirectory",
]
result = get_fields_for(product_type=product_type, filename=test_cal_metadata.filename)
assert result == fields
fields2 = ["RootDirectory", "RelativePath", "SdmIdentifier"]
result2 = get_fields_for("test/PPR.xml")
result2 = get_fields_for(product_type=product_type, filename=test_ppr.filename)
assert result2 == fields2
product_type2 = "standard-img"
img_fields = [
"fileSetIds",
"workflowName",
"systemId",
"creationTime",
"productLocator",
"destinationDirectory",
"cms_path",
"sdmId",
]
result = get_fields_for(product_type=product_type2, filename=test_img_metadata.filename)
assert result == img_fields
def test_get_xml_content():
content = get_xml_content("test/PPR.xml")
content = get_xml_content(test_ppr)
assert content.find("RelativePath").string == "tmpiox5trbp"
assert content.find("SdmIdentifier").string == "brain_001.58099.678886747686"
assert content.find("CreationTime").string == "2021-05-06T17:34:36"
def test_get_value_for():
value = get_value_for("test/PPR.xml", "RelativePath")
value = get_value_for(test_ppr, "RelativePath")
assert value == "tmpiox5trbp"
value = get_value_for("test/PPR.xml", "SdmIdentifier")
value = get_value_for(test_ppr, "SdmIdentifier")
assert value == "brain_001.58099.678886747686"
value = get_value_for("test/test.json", "workflowName")
value = get_value_for(test_cal_metadata, "workflowName")
assert value == "std_calibration"
value = get_value_for("test/test.json", "productLocator")
value = get_value_for(test_cal_metadata, "productLocator")
assert value == "uid://evla/execblock/9cb0964b-ad6b-40ed-bd87-d08c502503e2"
class TestAuditFiles:
def test_read_file(self):
file = AuditFiles(["test/test.json", "test/PPR.xml"], settings).read_file("test/test.json")
assert file.filename == "test/test.json"
file2 = AuditFiles(["test/test.json", "test/PPR.xml"], settings).read_file("test/PPR.xml")
assert file2.filename == "test/PPR.xml"
def test_check_required_fields(self):
file = AuditFiles(["test/test.json", "test/PPR.xml"], settings).read_file("test/test.json")
audit_files = AuditFiles([test_cal_metadata, test_ppr], cal_settings)
fields = [
"fileSetIds",
"workflowName",
"systemId",
"creationTime",
"productLocator",
"destinationDirectory",
]
result = AuditFiles(["test/test.json", "test/PPR.xml"], settings).check_required_fields(file, fields)
result = audit_files.check_required_fields(test_cal_metadata, fields)
assert result is True
fieldsF = [
"fileSetIds",
"workflowName",
"systemId",
"creationTime",
"productLocator",
"destinationDirectory",
"FakeField",
]
resultF = AuditFiles(["test/test.json", "test/PPR.xml"], settings).check_required_fields(
file, fieldsF
)
resultF = audit_files.check_required_fields(test_cal_metadata, fieldsF)
assert resultF is False
file2 = AuditFiles(["test/test.json", "test/PPR.xml"], settings).read_file("test/PPR.xml")
fields2 = ["RootDirectory", "RelativePath", "SdmIdentifier"]
result2 = AuditFiles(["test/test.json", "test/PPR.xml"], settings).check_required_fields(
file2, fields2
)
result2 = audit_files.check_required_fields(test_ppr, fields2)
assert result2 is True
fields2F = ["RootDirectory", "RelativePath", "SdmIdentifier", "FakeField"]
result2F = AuditFiles(["test/test.json", "test/PPR.xml"], settings).check_required_fields(
file2, fields2F
)
result2F = audit_files.check_required_fields(test_ppr, fields2F)
assert result2F is False
@patch("os.chdir")
@patch("shutil.copy")
def test_correct_for_condor(self, mock_copy, mock_os):
ppr = AuditFiles(["test/test.json", "test/PPR.xml"], settings).correct_for_condor(ppr=test_ppr)
ppr = AuditFiles([test_cal_metadata, test_ppr], cal_settings).correct_for_condor(
ppr=test_ppr
)
assert ppr.filename == "test/PPR.xml"
def test_audit(self):
result = AuditFiles(["test/test.json", "test/PPR.xml"], settings=settings).audit()
result = AuditFiles([test_cal_metadata, test_ppr], cal_settings).audit()
assert result is True
class TestAuditDirectories:
def test_audit(self):
result = AuditDirectories("test/PPR.xml", settings).audit()
result = AuditDirectories(test_ppr, cal_settings).audit()
assert result is False
......@@ -4,32 +4,59 @@ Tests for casa_envoy.palaver
import argparse
from unittest.mock import patch, MagicMock
from casa_envoy.palaver import check_calibratable, check_imagable, _get_settings
import casa_envoy.palaver as palaver
expected_settings = {
"useCasa": False,
"homeForReprocessing": "/home/casa/packages/pipeline/current",
"rootDirectory": "/lustre/aoc/cluster/pipeline/docker/workspaces/spool",
"processingDirectory": "tmpo1ca1pp_",
"parent_path": "/lustre/aoc/cluster/pipeline/docker/workspaces/spool/tmpo1ca1pp_",
"metadata": "test/test.json",
"ppr": "test/PPR.xml",
"product_type": "standard-cal",
}
args = argparse.Namespace()
def test_check_calibratable():
check = check_calibratable("test/test.json")
assert check is True
class TestPalaver:
def test_get_settings(self):
args.standard_cal = ["test/test.json", "test/PPR.xml"]
with patch(
"pathlib.Path.cwd",
MagicMock(
return_value="/lustre/aoc/cluster/pipeline/docker/workspaces/spool/tmpo1ca1pp_"
),
) as cwd:
settings = palaver._get_settings(cwd, args.standard_cal)
assert settings["useCasa"] == expected_settings["useCasa"]
assert settings["homeForReprocessing"] == expected_settings["homeForReprocessing"]
assert settings["metadata"] == expected_settings["metadata"]
assert settings["ppr"] == expected_settings["ppr"]
def test_check_imagable():
check = check_imagable("test/image-metadata.json")
assert check is True
args.standard_cal = None
@patch("os.chdir")
@patch("os.getcwd")
def test_main_cal(self, mock_cwd, mock_chdir):
args.standard_cal = ["test/test.json", "test/PPR.xml"]
def test_get_settings():
with patch(
"pathlib.Path.cwd",
MagicMock(return_value="/lustre/aoc/cluster/pipeline/docker/workspaces/spool/tmpo1ca1pp_"),
) as cwd:
settings = _get_settings(cwd)
assert settings["useCasa"] == expected_settings["useCasa"]
assert settings["homeForReprocessing"] == expected_settings["homeForReprocessing"]
with patch("argparse.ArgumentParser.parse_args", MagicMock(return_value=args)) as mock_args:
with patch("casa_envoy.launchers.CalibrationLauncher.launch_casa") as cal_launcher:
palaver.main()
assert cal_launcher.call_count == 1
args.standard_cal = None
@patch("os.chdir")
@patch("os.getcwd")
def test_main_img(self, mock_cwd, mock_chdir):
args.standard_img = ["test/image-metadata.json", "test/cmsimage-PPR.xml"]
with patch("argparse.ArgumentParser.parse_args", MagicMock(return_value=args)) as mock_args:
with patch("casa_envoy.launchers.ImagingLauncher.launch_casa") as img_launcher:
palaver.main()
assert img_launcher.call_count == 1
args.standard_img = None
import os
from unittest.mock import mock_open, patch
from casa_envoy.launchers import CalibrationLauncher, ImagingLauncher
from casa_envoy.launchers import CalibrationLauncher, ImagingLauncher, CasaLauncher
cal_parameters = {
"useCasa": False,
"homeForReprocessing": "/home/casa/packages/pipeline/current",
"rootDirectory": "/tmp/workspaces_tmp/",
"processingDirectory": "tmpiox5trbp",
"metadata": "test/test.json",
"ppr": "test/PPR.xml",
"product_type": "standard-cal",
}
img_parameters = {
"useCasa": False,
"homeForReprocessing": "/home/casa/packages/pipeline/current",
"rootDirectory": "/tmp/workspaces_tmp/",
"processingDirectory": "tmpiox5trbp",
"metadata": "test/image-metadata.json",
"ppr": "test/cmsimage-PPR.xml",
"product_type": "standard-img",
}
class TestCalibrationLauncher:
@patch("subprocess.Popen")
class TestCasaLauncher:
def test_setup_environment(self):
CasaLauncher(parameters=cal_parameters).setup_environment()
assert os.environ.get("SCIPIPE_ROOTDIR") == cal_parameters["rootDirectory"]
assert os.environ.get("CASA_HOME") == cal_parameters["homeForReprocessing"]
assert os.environ.get("PPR_FILENAME") == "test/PPR.xml"
@patch("subprocess.run")
@patch("os.chdir")
def test_run(self, mock_os, mock_subprocess):
CalibrationLauncher("test/PPR.xml", "test/test.json").run()
CasaLauncher(parameters=cal_parameters).run()
assert mock_subprocess.call_count == 1
def test_setup_environment(self):
parameters = {
"useCasa": False,
"homeForReprocessing": "/home/casa/packages/pipeline/current",
"rootDirectory": "/tmp/workspaces_tmp/",
"processingDirectory": "tmpiox5trbp",
}
CalibrationLauncher("PPR.xml", "test.json").setup_environment(parameters=parameters)
assert os.environ.get("SCIPIPE_ROOTDIR") == parameters["rootDirectory"]
assert os.environ.get("CASA_HOME") == parameters["homeForReprocessing"]
assert os.environ.get("PPR_FILENAME") == "PPR.xml"
@patch("builtins.open")
@patch("glob.glob")
@patch("os.chdir")
@patch("os.getcwd")
def test_check_logs(self, mock_os_cwd, mock_os_dir, mock_glob, mock_open):
CalibrationLauncher("PPR.xml", "test.json").check_logs(parent_path=".")
CasaLauncher(parameters=cal_parameters).check_logs()
assert mock_os_cwd.call_count == 1
assert mock_os_dir.call_count == 0
assert mock_glob.call_count == 1
class TestImagingLauncher:
@patch("os.chdir")
def test_run(self, mock_os):
with patch("subprocess.Popen") as sp:
ImagingLauncher("test/cmsimage-PPR.xml", "test/image-metadata.json").run()
assert sp.call_count == 1
class TestCalibrationLauncher:
@patch("casa_envoy.launchers.CalibrationLauncher.run_audit")
@patch("casa_envoy.launchers.CasaLauncher.run")
def test_launch_casa(self, mock_run, mock_audit):
CalibrationLauncher(parameters=cal_parameters).launch_casa()
assert mock_run.call_count == 1
assert mock_audit.call_count == 1
def test_setup_environment(self):
parameters = {
"useCasa": False,
"homeForReprocessing": "/home/casa/packages/pipeline/current",
"rootDirectory": "/tmp/workspaces_tmp/",
"processingDirectory": "tmpiox5trbp",
}
ppr = "cmsimage-PPR.xml"
metadata = "image-metadata.json"
ImagingLauncher(ppr, metadata).setup_environment(parameters=parameters)
assert os.environ.get("SCIPIPE_ROOTDIR") == parameters["rootDirectory"]
assert os.environ.get("CASA_HOME") == parameters["homeForReprocessing"]
assert os.environ.get("PPR_FILENAME") == ppr
def test_check_calibratable(self):
check = CalibrationLauncher(parameters=cal_parameters).check_calibratable()
assert check is True
@patch("builtins.open")
@patch("glob.glob")
@patch("os.chdir")
@patch("os.getcwd")
def test_check_logs(self, mock_os_cwd, mock_os_dir, mock_glob, mock_open):
ppr = "cmsimage-PPR.xml"
metadata = "image-metadata.json"
ImagingLauncher(ppr, metadata).check_logs(parent_path=".")
assert mock_os_cwd.call_count == 1
assert mock_os_dir.call_count == 0
assert mock_glob.call_count == 1
class TestImagingLauncher:
@patch("casa_envoy.launchers.CalibrationLauncher.run_audit")
@patch("casa_envoy.launchers.CasaLauncher.run")
def test_launch_casa(self, mock_run, mock_audit):
CalibrationLauncher(parameters=img_parameters).launch_casa()
assert mock_run.call_count == 1
assert mock_audit.call_count == 1
def test_check_imageable(self):
check = ImagingLauncher(parameters=img_parameters).check_imageable()
assert check is True
......@@ -5,8 +5,6 @@ import argparse
from unittest.mock import patch, MagicMock
import conveyor.conveyor as con
from conveyor.deliver import DeliveryConveyor
from conveyor.retrieve import RetrievalConveyor
expected_settings = {
"qa_delivery_area": "/lustre/aoc/cluster/pipeline/docker/workspaces/qa2",
......@@ -26,14 +24,17 @@ class TestConveyor:
settings = con._get_settings("test/test.json")
assert settings["qa_delivery_area"] == expected_settings["qa_delivery_area"]
assert settings["weblog_cache_area"] == expected_settings["weblog_cache_area"]
assert settings["workspaces_lustre_root_dir"] == expected_settings["workspaces_lustre_root_dir"]
assert (
settings["workspaces_lustre_root_dir"]
== expected_settings["workspaces_lustre_root_dir"]
)
# mock calls to cwd and count
assert cwd.call_count == 2
assert settings["destination_dir"] == expected_settings["destination_dir"]
assert settings["destination_subdir"] == expected_settings["destination_subdir"]
def test_main_deliver(self):
args.deliver = ['test/test.json']
args.deliver = ["test/test.json"]
with patch("argparse.ArgumentParser.parse_args", MagicMock(return_value=args)) as mock_args:
assert args.deliver[0] == "test/test.json"
......@@ -45,7 +46,7 @@ class TestConveyor:
args.deliver = None
def test_main_retrieve(self):
args.retrieve = ['test/test.json']
args.retrieve = ["test/test.json"]
with patch("argparse.ArgumentParser.parse_args", MagicMock(return_value=args)) as mock_args:
with patch("conveyor.retrieve.RetrievalConveyor.convey") as ret_convey:
......
import argparse
import logging
import sys
from pycapo import CapoConfig
from ingest_envoy.launchers import IngestCalibrationLauncher
from ingest_envoy.solicitor import Solicitor
"""
Setup and Launch ingestion via Workspaces
"""
logger = logging.getLogger("ingest_envoy")
logger.setLevel(logging.INFO)
logger.addHandler(logging.StreamHandler(sys.stdout))
def _get_settings(filename: str) -> dict:
ingestion_settings = CapoConfig().settings("edu.nrao.archive.workspaces.IngestionSettings")
staging_root_dir = ingestion_settings.stagingDirectory
storage_root_dir = ingestion_settings.storageDirectory
parameters = Solicitor(filename).solicit_parameters()
parameters["staging_area"] = staging_root_dir
parameters["storage_area"] = storage_root_dir
return parameters
def arg_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(
description="Workspaces Ingestion handler", formatter_class=argparse.RawTextHelpFormatter
)
parser.add_argument(
"--calibration",
nargs=1,
action="store",
required=False,
help="run ingestion for a calibration product",
)
parser.add_argument(
"--image",
nargs=1,
action="store",
required=False,
help="run ingestion for an image product (anticipated functionality)",
)
return parser
def check_ingest_type(args_type: str, parameters: dict) -> bool:
if args_type in parameters["workflowName"]:
return True
return False
def main():
args = arg_parser().parse_args()
if args.calibration is not None:
arg_type = "calibration"
parameters = _get_settings(args.calibration[0])
if check_ingest_type(args_type=arg_type, parameters=parameters):
IngestCalibrationLauncher(parameters).launch_ingestion()
else:
logger.error(
f"ERROR: The workflow request to be ingested does not match specified ingestion type {type}."
)
""" The ingestion manifest """
import json
""" This is the entrypoint for ingestion launching """
# pylint: disable=E0402, R0903, R0913
from typing import Dict
import logging
import sys
from pathlib import Path
from .utilities import Telescope
# pylint: disable=R0903
from ingest_envoy.ingestion_manifest_writer import EvlaCalIngestionManifestWriter
from ingest_envoy.utilities import ScienceProductType
class Parameters:
"""a manifest's various input parameters"""
def __init__(
self,
telescope: Telescope,
# for JSON parsing, path must be a string
ingestion_path: str,
additional_metadata: str,
collection_metadata: str,
reingest: bool = False,
ngas_ingest: bool = False,
):
self.telescope = telescope
self.ingestion_path = ingestion_path
self.additional_metadata = additional_metadata
self.collection_metadata = collection_metadata
self.reingest = reingest
self.ngas_ingest = ngas_ingest
def __repr__(self):
return repr(self.__dict__)
class InputGroup:
"""
This represents the starting point for processing which generated a science product.
There is not always an input group for every output group (rawdata, for instance).
Initial assumption: Input groups consist only of science products.
"""
def __init__(self):
self.science_products = []
def __repr__(self) -> str:
return repr(self.__dict__)
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
logger.addHandler(logging.StreamHandler(sys.stdout))
class IngestionManifest:
"""Represents JSON layout of ingestion information, encompassing several potential scenarios.
see ingest_envoy/test/examples, nicked from https://open-confluence.nrao.edu/x/roPCAQ
"""
def __init__(self, parameters: Parameters):
self.parameters = parameters
self.input_group = InputGroup()
if self.parameters.additional_metadata:
self.input_group.science_products.append(
json.loads(self.parameters.additional_metadata)
)
"""needed for ingestion-launching interface"""
self.output_group = OutputGroup()
if self.parameters.collection_metadata:
self.output_group.ancillary_products.append(
json.loads(self.parameters.collection_metadata)
)
self.ingestion_path = self.parameters.ingestion_path
# TODO: what is this, and how do we use it?
self.associate_group = AssociateGroup()
def __init__(self, staging_source_dir: str, ingestion_type: str, locator: str):
self.ingest_path = Path(staging_source_dir)
self.sp_type = ScienceProductType.from_str(ingestion_type)
self.locator = locator
def content(self) -> Dict:
def create(self):
"""
Accessor for manifest content
Create the ingestion manifest in this directory for a product of this type,
identified by this locator.
:return: manifest as dict
:return:
"""
return dict(
input_group=repr(self.input_group),
output_group=repr(self.output_group),
associate_group=repr(self.associate_group),
ingestion_path=repr(self.ingestion_path),
science_products=repr(self.input_group.science_products),
ancillary_products=repr(self.output_group.ancillary_products),
)
class OutputGroup:
"""Represents result of data processing"""
def __init__(self):
self.science_products = []
self.ancillary_products = []
def __repr__(self):
return repr(self.__dict__)
class AssociateGroup:
"""
A representation of Science Products which are not part of the same Input or Output groups
but are still fundamentally linked. Created for RealFast project, to link the RealFast
specific execution block & image to the execution block during which a transient was
discovered.
Associate groups also, by definition, include any science product(s) within the output
group to be ingested. The new locators generated at ingestion time will be added to any
which compose an associate group in the manifest.
"""
def __init__(self):
self.science_products = []
if self.sp_type != ScienceProductType.EVLA_CAL:
return NotImplementedError(
f"Don't yet know how to handle {self.sp_type.value} science product"
)
def __repr__(self):
return repr(self.__dict__)
writer = EvlaCalIngestionManifestWriter(self.ingest_path)
writer.write_evla_cal_manifest(self.locator)
"""Build an ingestion manifest file"""
import json
import logging
import shutil
import re
import sys
import tarfile
from pathlib import Path
from typing import Tuple, List
from typing import List, Tuple, Dict
# pylint: disable=E0401, E0402
# pylint: disable=E0401, E0402,R1721
import pendulum
from pendulum import DateTime
from .ingestion_manifest import IngestionManifest
from .utilities import (
MANIFEST_NAME_BASE,
MANIFEST_NAME_EXT,
ARTIFACT_NAME,
ARTIFACT_EXT,
WEBLOG,
SCIENCE_PRODUCT_PATTERN,
EvlaCalInputScienceProduct,
EvlaCalInputGroup,
EvlaCalOutputScienceProduct,
EvlaCalOutputGroup,
)
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
logger.addHandler(logging.StreamHandler(sys.stdout))
MANIFEST_NAME_BASE = "ingestion_manifest_"
MANIFEST_NAME_EXT = ".json"
ARTIFACT_NAME = "ingestion_artifacts_"
ARTIFACT_EXT = ".tar"
class EvlaCalIngestionManifestWriter:
"""For building ingestion manifests"""
class IngestionManifestWriter:
"""Uses supplied parameters to build ingestion manifest files
for the various types of ingestion"""
# (science product type is always EVLA_CAL)
def __init__(self, ingest_path: Path):
self.ingest_path = ingest_path
self.input_group = self.output_group = self.infiles = None
def __init__(
self,
manifest: IngestionManifest,
working_dir: Path,
):
self.manifest = manifest
self.working_dir = working_dir
self.parameters = manifest.parameters
self.manifest_filename, self.artifact_filename = self.build_filenames()
def build(self) -> Path:
def write_evla_cal_manifest(self, locator: str) -> Tuple[Path, List[Path]]:
"""
Write the ingestion manifest indicated by the parameters.
Write the manifest and associated ingestion files
:return: the ingestion manifest constructed from the parameters
:param locator: science product locator
:return:
"""
self.infiles = [file for file in self.ingest_path.iterdir()]
self.input_group = EvlaCalInputGroup(EvlaCalInputScienceProduct(locator))
self.output_group = self._make_evla_cal_output_group()
# Pull out the manifest content and stringify it
manifest_content = json.dumps(self.manifest.content())
manifest_content = json.dumps(self.content(), indent=4)
# Write the manifest to the staging area, where other files may be.
ingestion_location = self.parameters.ingestion_path
staging_manifest = Path(ingestion_location) / self.manifest_filename
manifest_filename = self.manifest_filename()
# Write the manifest to the staging area
staging_manifest = Path(self.ingest_path / manifest_filename)
with open(staging_manifest, "w") as out:
out.write(manifest_content)
out.write(f"{manifest_content}\n")
# Get all the files we'll need
addl_ingestion_files = self.find_additional_ingestion_files()
# # Open up permissions so we can delete the manifest file later.
# rw_mode = 0o666
# staging_manifest.chmod(rw_mode)
# Write the artifacts tar.
artifacts_tar = self.write_ingestion_artifacts_tar(staging_manifest, addl_ingestion_files)
addl_ingestion_files.append(artifacts_tar)
# Get all the files we'll need....
ingestion_files = self.find_ingestion_files()
# there should be at least one file, the manifest
if len(ingestion_files) < 1:
raise ValueError(f"no ingestion files nor manifest found at {ingestion_location}")
# return a Path explicitly; LocalPath won't work
return Path(staging_manifest), addl_ingestion_files
# ....and put them in both places, with rw permissions
for filename in ingestion_files:
staging_dir_copy = ingestion_location / filename
working_dir_copy = self.working_dir / filename
shutil.copy(str(staging_manifest), str(working_dir_copy))
def write_ingestion_artifacts_tar(
self, manifest_file: Path, ingestion_files: List[Path]
) -> Path:
"""
Take the list of files and build a tar for inclusion into the archive.
This happens in the staging area for ingestion.
The EVLA CAL tar will contain just the manifest
# Rename the manifest to the shared name decided on for ingestion invocation.
# Path() cast "shouldn't" be necessary, but if we don't do it,
# file is a LocalPath and we can't create the symlink
generalized = Path(self.working_dir / "ingestion-manifest.json")
generalized.symlink_to(working_dir_copy, target_is_directory=False)
:param manifest_file: the ingestion manifest
:param ingestion_files: all the files needed for ingestion
:return: a .tar archive of the ingestion artifacts
"""
ingestion_artifacts_fn = self.ingest_path / "ingestion_artifacts.tar"
with tarfile.open(ingestion_artifacts_fn, "w") as ingestion_artifacts_tar:
for file in ingestion_files:
ingestion_artifacts_tar.add(file)
# Now that all the loose files are copied, create the ingestion artifacts tar
self.write_ingestion_artifact_tar(ingestion_files)
# include the manifest
if manifest_file not in ingestion_files:
ingestion_artifacts_tar.add(manifest_file)
# again: return a Path explicitly, for a LocalPath won't work
return Path(staging_manifest)
return ingestion_artifacts_fn
@staticmethod
def format_timestamp(start_time: DateTime) -> str:
def _make_evla_cal_output_group(self):
"""
Format the current time as follows:
input format:
2021-07-01T13:49:17.237119+00:00
desired output format as yyyy_MM_dd_'T'HH_mm_ss.SSS:
2021_07_01'T'13_49_17.237
Create the JSON for the "output-group" section of the manifest.
An EVLA CAL OutputGroup contains a science product(s) and an ancillary product
(the weblog)
:param start_time: current pendulum timestamp
:return:
:return: manifest output group
"""
sp_tar = self._find_science_product_tar()
science_product = EvlaCalOutputScienceProduct(str(sp_tar))
weblog = Path(self.ingest_path / WEBLOG)
if weblog.exists():
return EvlaCalOutputGroup(science_product, weblog)
time_str = str(start_time)
# change hyphens and colons to underscores
timestamp = time_str.replace("-", "_", len(time_str))
timestamp = timestamp.replace(":", "_", len(timestamp))
# chop off the last bit
timestamp = timestamp.split("+")[0]
# now timestamp ends with ss.###....; round to 3 places
ts_parts = timestamp.split("_")
seconds = float(ts_parts[len(ts_parts) - 1])
rounded = round(seconds, 3)
timestamp = timestamp.replace(str(seconds), str(rounded))
# finally, the T in single quotes
timestamp = timestamp.replace("T", "'T'")
return EvlaCalOutputGroup(science_product, None)
return timestamp
def content(self) -> Dict[str, str]:
"""
Accessor for manifest content
def build_filenames(self) -> Tuple[str, str]:
:return: manifest as dict
"""
Build manifest and artifacts filenames.
:return: the filenames
params = {
"reingest": "false",
"ngas_ingest": "false",
"calibrate": "false",
"ingestion_path": str(self.ingest_path),
}
json_out = dict(
parameters=params,
input_group=self.input_group.__json__(),
output_group=self.output_group.__json__(),
# associate_group=self.associate_group.__json__(), # TODO when we need it
ingestion_path=str(self.ingest_path),
)
return json_out
def _find_science_product_tar(self) -> Path:
"""
current_time = pendulum.now()
timestamp = self.format_timestamp(current_time)
manifest_filename = f"{MANIFEST_NAME_BASE}{timestamp}{MANIFEST_NAME_EXT}"
artifact_filename = f"{ARTIFACT_NAME}{timestamp}{ARTIFACT_EXT}"
return manifest_filename, artifact_filename
A calibration ingestion staging dir should have ONE science product tar; ignore any others
def find_ingestion_files(self) -> List[Path]:
:return:
"""
Gather the files required for ingestion
files = [file for file in self.ingest_path.iterdir() if file.is_file]
for file in files:
if re.match(SCIENCE_PRODUCT_PATTERN, file.name):
return file
:return: ingestion inputs
raise FileNotFoundError(f"no science product found at {self.ingest_path}")
@staticmethod
def format_timestamp(datetime: DateTime) -> str:
"""
Format the current time as
2021_07_01T13_49_17.237
coll_files = aux_files = []
:param datetime: current timestamp
:return: timestamp suitable for ingestion manifest filename
"""
if self.parameters.additional_metadata is not None:
aux_string = self.parameters.additional_metadata
aux_files = aux_string.split(",")
return datetime.format("YYYY_MM_DDThh_mm_ss.SSS")
if self.parameters.collection_metadata is not None:
coll_str = self.parameters.collection_metadata
coll_files = coll_str.split(",")
@staticmethod
def manifest_filename() -> str:
"""
Build unique manifest filename in standard format.
aux_files += coll_files
:return: the filename
"""
current_time = pendulum.now()
timestamp = EvlaCalIngestionManifestWriter.format_timestamp(current_time)
return f"{MANIFEST_NAME_BASE}{timestamp}{MANIFEST_NAME_EXT}"
# be sure to add the manifest itself
aux_files.append(self.manifest_filename)
@staticmethod
def artifact_filename() -> str:
"""
Build manifest and artifacts filenames.
return aux_files
:return: the filenames
"""
current_time = pendulum.now()
timestamp = EvlaCalIngestionManifestWriter.format_timestamp(current_time)
artifact_filename = f"{ARTIFACT_NAME}{timestamp}{ARTIFACT_EXT}"
return artifact_filename
def write_ingestion_artifact_tar(self, ingestion_files: List[Path]) -> Path:
def find_additional_ingestion_files(self) -> List[Path]:
"""
Take the list of files and build a tar for inclusion into the archive.
This happens in the staging area for ingestion.
Gather the files required for ingestion.
There won't be any for EVLA CAL ingestion; this is just a placeholder.
:param ingestion_files: all the files needed for ingestion
:return: a .tar archive of the ingestion artifacts
:return: ingestion inputs
"""
return []
# TODO: we'll have extra information for other ingestion types
# coll_files = aux_files = []
#
# if self.additional_metadata is not None:
# addl_md = json.loads(self.additional_metadata)
# aux_files.append(addl_md["filename"])
#
# if self.collection_metadata is not None:
# coll_md = json.loads(self.collection_metadata)
# coll_files.append(coll_md["filename"])
#
# aux_files += coll_files
#
# # be sure to add the manifest itself
# aux_files.append(self.manifest_filename)
#
# return list(set(aux_files))
"""
Interfaces for ingest_envoy
"""
import abc
from abc import ABC
class IngestLauncherIF(ABC):
"""
Generic Ingestion Launcher methods.
Should be implemented for any type of ingestion launcher
"""
# launch all ingest steps, should be called in ingest.main
@abc.abstractmethod
def launch_ingestion(self):
pass
# run ingest pex
@abc.abstractmethod
def run_ingest(self):
pass
# setup workflow results for ingestion, ensure placement in staging area
@abc.abstractmethod
def prepare_for_ingest(self):
pass
# create ingestion manifest
@abc.abstractmethod
def create_manifest(self):
pass
import logging
import subprocess
import sys
from ingest_envoy.ingestion_manifest import IngestionManifest
from ingest_envoy.interfaces import IngestLauncherIF
class IngestCalibrationLauncher(IngestLauncherIF):
def __init__(self, parameters: dict):
self.logger = logging.getLogger("ingest_envoy")
self.ingestion_type = "calibration"
self.parameters = parameters
self.staging_source_dir = (
self.parameters["staging_area"] + "/" + self.parameters["workflowDir"]
)
def launch_ingestion(self):
self.prepare_for_ingest()
self.run_ingest()
def run_ingest(self):
subprocess.run(
["./ingest", "--json", self.staging_source_dir],
stdout=sys.stdout,
stderr=sys.stderr,
)
def prepare_for_ingest(self):
# 1. run collection script to create calibration tarfile
self.run_collection_script()
# 2. create ingestion manifest
self.create_manifest()
def run_collection_script(self):
workflow_dir = self.parameters["workflowDir"]
sdm_id = self.parameters["sdmId"]
cal_processing_datetime = self.parameters["processingStart"]
# run script
subprocess.run(
["./calibration-table-collector.sh", workflow_dir, sdm_id, cal_processing_datetime],
stdout=sys.stdout,
stderr=sys.stderr,
)
def create_manifest(self):
spl = self.parameters["spl"]
IngestionManifest(self.staging_source_dir, self.ingestion_type, spl).create()
def main():
print("Hello, world!")
\ No newline at end of file
"""
Solicitor: uses metadata.json to determine input parameters for ingestion
"""
import json
import pathlib
class Solicitor:
def __init__(self, filename: str):
self.filename = filename
self.metadata = self.solicit_contents()
def solicit_contents(self) -> dict:
with open(self.filename) as file:
metadata = json.loads(file.read())
return metadata
def solicit_workflow_directory_name(self):
destination_path = pathlib.Path(self.metadata["destinationDirectory"])
return destination_path.stem
def solicit_processing_creation_time(self):
# incoming datetime: 2021-07-06T21:50:48
# transformation for NGAS friendliness: 2021_07_06T21_50_48
creation_time = self.metadata["creationTime"].replace("-", "_").replace(":", "_")
return creation_time
def solicit_parameters(self):
metadata = self.solicit_contents()
return {
"sdmId": metadata["fileSetIds"],
"workflowName": metadata["workflowName"],
"spl": metadata["productLocator"],
"processingStart": self.solicit_processing_creation_time(),
"destinationDir": metadata["destinationDirectory"],
"workflowDir": self.solicit_workflow_directory_name(),
}
""" Objects pertaining to the various ingestion manifests """
from __future__ import annotations
import json
import re
import tarfile
from enum import Enum
from pathlib import Path
from typing import List, Dict
# pylint: disable=E0401, R0903, R1721
MANIFEST_NAME_BASE = "ingestion_manifest_"
MANIFEST_NAME_EXT = ".json"
ARTIFACT_NAME = "ingestion_artifacts_"
ARTIFACT_EXT = ".tar"
WEBLOG = "weblog.tgz"
# pylint: disable=R0903
SCIENCE_PRODUCT_PATTERN = re.compile("[a-zA-Z0-9._\\-+]*\\.tar")
class Telescope(Enum):
"""Codifying the names of our telescopes, because Janet can't abide magic strings"""
VLA = 1
EVLA = 2
ALMA = 3
VLBA = 4
GBT = 5
NONE = 6
VLA = "VLA"
EVLA = "EVLA"
ALMA = "ALMA"
VLBA = "VLBA"
GBT = "GBT"
NONE = "NONE"
class IngestionType(Enum):
......@@ -55,24 +71,35 @@ class ScienceProductType(Enum):
"""Canonical collection of ingestible types of science products"""
EXEC_BLOCK = "execution_block"
CAL = "calibration"
EVLA_CAL = "calibration"
CATALOG = "catalog"
IMAGE = "image"
def __str__(self):
return f'"{str(self.value)}"'
class ScienceProduct:
"""Represents a science product in an ingestion manifest"""
@staticmethod
def from_str(sp_type_in) -> ScienceProductType:
"""
In comes a string; out comes the corresponding ScienceProductType, if any.
Or maybe it's already a ScienceProductType, in which case we can just return it.
def __init__(self, sp_type: ScienceProductType, filename: str, locator: str, group_with: str):
self.sp_type = sp_type
self.filename = filename
# product locator, used for input groups; locator for a known science product
self.locator = locator
# for "late" science products; they get added to an existing output group
self.group_with = group_with
:param sp_type_in: a string that "should" represent a ScienceProductType
:return:
"""
if isinstance(sp_type_in, ScienceProductType):
return sp_type_in
for spt in ScienceProductType:
if spt.value == sp_type_in:
return spt
raise ValueError(
f"unrecognized ScienceProductType: {sp_type_in}; it's a {type(sp_type_in)}"
)
class AncillaryProductType:
class AncillaryProductType(Enum):
"""The various types of ancillary products we'll encounter"""
INGESTION_ARTIFACTS = "ingestion_artifacts"
......@@ -92,6 +119,9 @@ class AncillaryProductType:
CANDIDATE_IMG = "candidate_image"
THUMBNAIL_IMG = "thumbnail_image"
def __str__(self):
return f'"{self.value}"'
class AncillaryProduct:
"""Represents an ancillary product in an ingestion manifest"""
......@@ -105,3 +135,189 @@ class AncillaryProduct:
self.science_associate = science_associate # TODO, possibly: enum?
# make this an ancillary to the group of a science product (assumes locator string)
self.group_with = group_with
def write_ingestion_artifact_tar(ingestion_location: Path, ingestion_files: List[Path]) -> Path:
"""
Take the list of files and build a tar for inclusion into the archive.
This happens in the staging area for ingestion.
:param ingestion_location: path to ingestion location (probably the spool directory)
:param ingestion_files: all the files needed for ingestion
:return: a .tar archive of the ingestion artifacts
"""
ingestion_artifacts_tar = ingestion_location / "ingestion_artifacts.tar"
with tarfile.open(ingestion_artifacts_tar, "w") as ingestion_artifacts_tar:
for file in ingestion_files:
ingestion_artifacts_tar.add(file)
return Path(ingestion_artifacts_tar.name)
class EvlaCalInputScienceProduct:
"""Represents the "science_product" in EvlaCalInputGroup"""
def __init__(self, locator: str):
self.locator = locator
self.type = ScienceProductType.EVLA_CAL
def __json__(self) -> Dict[str, str]:
json_out = self.__dict__
json_out["type"] = ScienceProductType.EVLA_CAL.value
return json_out
class EvlaCalInputGroup:
"""
This represents the starting point for processing which generated a science product.
There is not always an input group for every output group (rawdata, for instance).
Initial assumption: Input groups consist only of science products.
"""
def __init__(self, science_product: EvlaCalInputScienceProduct):
# science product locator
self.science_products = [science_product]
def __json__(self) -> Dict[str, str]:
"""
Create the "input-group" section of the manifest as a JSON string
:return: JSONified InputGroup
"""
json_out = self.__dict__
sps = json_out["science_products"]
sci_prod = sps[0]
sp_str = sci_prod if isinstance(sci_prod, str) else sci_prod.__json__()
json_out["science_products"] = f"[{sp_str}]"
return json_out
class EvlaCalOutputScienceProduct:
"""The science product in the output group"""
def __init__(self, filename: str):
self.filename = filename
self.type = ScienceProductType.EVLA_CAL
def __json__(self) -> Dict[str, str]:
json_out = self.__dict__
json_out["type"] = ScienceProductType.EVLA_CAL.value
return json_out
class Weblog:
"""Represents a weblog.tgz as an ancillary product"""
def __init__(self, weblog_path: Path):
self.ancillary_product = {"type": "weblog", "filename": str(weblog_path)}
def __json__(self) -> Dict[str, str]:
return dict(self.__dict__)
class EvlaCalOutputGroup:
"""Represents result of data processing. Will have a single calibration tar
plus a weblog.
"""
def __init__(self, science_product: EvlaCalOutputScienceProduct, weblog: Path):
self.science_products = [science_product]
self.ancillary_products = [Weblog(weblog)]
def __json__(self) -> Dict[str, str]:
"""
Create the "output-group" section of the manifest as a JSON string.
__json__() will not work; __repr__() is necessary for json.loads() to succeed.
:return: JSONified OutputGroup
"""
json_out = self.__dict__
anc_prod = self.ancillary_products[0]
ap_str = anc_prod if isinstance(anc_prod, str) else anc_prod.__json__()
json_out[IngestionManifestKey.ANCILLARY_PRODUCTS.value] = f"[{ap_str}]"
sci_prod = self.science_products[0]
sp_str = sci_prod if isinstance(sci_prod, str) else sci_prod.__json__()
json_out[IngestionManifestKey.SCIENCE_PRODUCTS.value] = f"[{sp_str}]"
return json_out
class EvlaCalIngestionManifest:
"""TODO: this is JUST the ingestion manifest JSON, not a bespoke object"""
def __init__(self, ingestion_path: Path, spl: str):
"""
:param ingestion_path: staging directory
:param spl: science product locator
"""
self.ingestion_path = ingestion_path
self.locator = spl
self.sp_type = ScienceProductType.EVLA_CAL
self.infiles = [file for file in self.ingestion_path.iterdir()]
self.input_group = EvlaCalInputGroup(EvlaCalInputScienceProduct(self.locator))
self.output_group = self._make_output_group()
def __str__(self):
params = self._make_params_section()
input_group = self.input_group.__json__()
output_group = self.output_group.__json__()
return f"{params}\n{input_group}\n{output_group}"
def _make_params_section(self) -> str:
"""
Create the JSON for the "parameters" section of the manifest.
It's always the same for any EVLA CAL ingestion manifest, except for the ingestion path.
:return: stringified JSON
"""
params = {
"parameters": {
"reingest": "false",
"ngas-ingest": "false",
"calibrate": "false",
"ingestion_path": str(self.ingestion_path),
}
}
return json.dumps(params)
def _make_output_group(self) -> EvlaCalOutputGroup:
"""
Create the JSON for the "output-group" section of the manifest.
An EVLA CAL OutputGroup contains a science product(s) and an ancillary product
(the weblog)
:return: manifest output group
"""
sp_tar = self._find_science_product_tar()
science_product = EvlaCalOutputScienceProduct(str(sp_tar))
weblog = Path(self.ingestion_path / WEBLOG)
if weblog.exists():
return EvlaCalOutputGroup(science_product, weblog)
return EvlaCalOutputGroup(science_product, None)
def _find_science_product_tar(self) -> Path:
"""
A calibration ingestion staging dir should have ONE science product tar; ignore any others
:return:
"""
for file in self.infiles:
if re.match(SCIENCE_PRODUCT_PATTERN, file.name):
return file
class IngestionManifestKey(Enum):
"""Sections we expect to see in a manifest"""
INPUT_GROUP = "input_group"
OUTPUT_GROUP = "output_group"
INGESTION_PATH = "ingestion_path"
SCIENCE_PRODUCTS = "science_products"
ANCILLARY_PRODUCTS = "ancillary_products"
......@@ -22,9 +22,8 @@ setup(
url="TBD",
license="GPL",
install_requires=requires,
tests_require=["pytest", "astropy", "pendulum"],
keywords=[],
packages=find_packages(),
classifiers=["Programming Language :: Python :: 3.8"],
entry_points={"console_scripts": ["ingest_envoy = ingest_envoy.main:main"]},
entry_points={"console_scripts": ["ingest_envoy = ingest_envoy.ingest:main"]},
)
#!/usr/bin/env bash
#
# A replacement for the old qaarchive script used by the
# data analysts. This script will tar & zip the pipeline
# weblog into WEBLOG.tar.gz and then collect everything but
# FITS files in the products directory from a CIPL run into
# a single tar file (naming convention TBD) that is created
# in a storage directory for ingestion.
#
# Arguments:
# 1: Working Directory in qa2 to be worked upon
# 2: Filename (if different from the above)
#
#
#Basics: Path modification
set -o errexit -o nounset -o xtrace
#
# command line argument
#
# The tar file will be named after the working directory it came from
# which preserves the processing time information.
#
SUBDIRECTORY=$1;shift
PROFILE=$1;shift
# Get the qa2, spool, and staging paths from CAPO
SPOOL_DIR=$(capo -P ${PROFILE} -q edu.nrao.archive.workflow.config.CiplWorkflowSettings.spoolDirectory)
STAGE_DIR=$(capo -P ${PROFILE} -q edu.nrao.archive.workflow.config.CiplWorkflowSettings.stageDirectory)
STORE_DIR=$(capo -P ${PROFILE} -q edu.nrao.archive.workflow.config.CiplWorkflowSettings.storageDirectory)
#Yet More VLASS Specialness
if [[ ${PROFILE} != vlass* ]]
then
QA2_DIR=$(capo -P ${PROFILE} -q edu.nrao.archive.workflow.config.CiplWorkflowSettings.qaDirectory)
FILENAME=${SUBDIRECTORY}
else
# For VLASS, they don't want the data moved between qa2/ and spool/
QA2_DIR=${SPOOL_DIR}
# They also provide an extra layer of directory within the filename.
IFS='/' # redefine the character on which to split
read -ra COMPONENTS <<< "${SUBDIRECTORY}"
IFS=' ' # reset to default after
# We get: calibration/VLASS1.1_stuff --> FILENAME becomes VLASS1.1_stuff (in line with CIPL)
FILENAME=${COMPONENTS[1]}
fi
# Get the weblog caching directory from CAPO
WEBLOG_CACHE=$(capo -P ${PROFILE} -q edu.nrao.archive.workflow.config.CiplWorkflowSettings.cacheWeblogDirectory)
#
# For the ability to reproduce results, we'll want the PPR.xml file. Ensure it is
# included in the products/ directory:
#
# TODO: Check for base_dir/products/*.pprequest.xml. If it exists, do nothing. If not, use base_dir/PPR.xml
if [ ! -e ${QA2_DIR}/${SUBDIRECTORY}/products/PPR.xml ]
then
cp ${QA2_DIR}/${SUBDIRECTORY}/working/PPR.xml ${QA2_DIR}/${SUBDIRECTORY}/products/PPR.xml
fi
#
# The VLASS project wants the flux.csv file. Check if it's here, if not, check for it in
# the working directory parallel to this one. Don't fail if we can't find it, however (so
# we minimize the disruption to the CIPL system).
#
if [ ! -e ${QA2_DIR}/${SUBDIRECTORY}/products/flux.csv ]
then
if [ -e ${QA2_DIR}/${SUBDIRECTORY}/working/flux.csv ]
then
cp ${QA2_DIR}/${SUBDIRECTORY}/working/flux.csv ${QA2_DIR}/${SUBDIRECTORY}/products/flux.csv
else
echo "No flux.csv file found here or in parallel working directory. Continuing."
fi
fi
#
# Both the pipeline-YYYMMDDTHHMMSS directory and weblog.tgz should exist. We prefer the
# directory (in case of updates/edits), but fall back on the tgz file.
#
# Check that they're both home, as we expect
WEBLOG_DIR=$(ls -t ${QA2_DIR}/${SUBDIRECTORY}/products | grep pipeline- | head -1)
if [ -n "$WEBLOG_DIR" ]
then
# if weblog.tgz exists, we want to remove it
if [ -e ${QA2_DIR}/${SUBDIRECTORY}/products/weblog.tgz ]
then
rm -f ${QA2_DIR}/${SUBDIRECTORY}/products/weblog.tgz
fi
# Tar & Zip the weblog
tar -C${QA2_DIR}/${SUBDIRECTORY}/products -czf ${QA2_DIR}/${SUBDIRECTORY}/products/weblog.tgz ${WEBLOG_DIR}
if [ $? -ne 0 ]
then
echo "Creation of weblog.tgz failed, exiting"
exit -1
fi
else
# no weblog directory. If there's no weblog.tgz file, there's an issue: Issue a warning
if [ ! -e ${QA2_DIR}/${SUBDIRECTORY}/products/weblog.tgz ]
then
echo "Neither weblog.tgz or the weblog directory exist, continuing"
fi
fi
#
# Sanity checks: create a staging subdirectory for this cal, and if the file already exists, remove it.
#
mkdir -p ${STAGE_DIR}/${SUBDIRECTORY}
if [ -e ${STAGE_DIR}/${SUBDIRECTORY}/${FILENAME}.tar ]
then
echo "Calibration Tar File Already Exists! Removing the file for recreation"
#We could rename them with a version ...
#FILENAME=${SUBDIRECTORY}.$(ls -1 ${STAGE_DIR}/${SUBDIRECTORY} | wc -l)
# if we rename it... how do we tell the workflow?
/bin/rm -f ${STAGE_DIR}/${SUBDIRECTORY}/${FILENAME}.tar
fi
#
# tar all non-fits and non-weblog-related files into a tar archive in the storage path
# SSA-6115: Don't exclude the weblog.tgz: Users and DAs prefer it bundled in.
#
tar --exclude=\*.fits --exclude=pipeline-\* -C${QA2_DIR}/${SUBDIRECTORY} -cvf ${STAGE_DIR}/${SUBDIRECTORY}/${FILENAME}.tar products
if [ $? -ne 0 ]
then
echo "Creation of main tar file failed, exiting"
exit -1
fi
#
# Copy the weblog over, for ingestion as an ancillary file
#
/bin/cp -f ${QA2_DIR}/${SUBDIRECTORY}/products/weblog.tgz ${STAGE_DIR}/${SUBDIRECTORY}
if [ $? -ne 0 ]
then
echo "Copy of the weblog to staging location failed. Exiting."
exit -1
fi
#
# To stay consistent with current working methods: Copy from STAGE_DIR to STORE_DIR
#
cp ${STAGE_DIR}/${SUBDIRECTORY}/${FILENAME}.tar ${STORE_DIR}
if [ $? -ne 0 ]
then
# If something goes wrong, make some noise, but continue on.
echo "Failed to copy the calibration to ${STORE_DIR}, continuing."
$? = 0
fi
# Move subdirectories to the /spool/ copy of this directory
# if it exists, otherwise, just move what we have to /spool/
#
# If this is an ingestion for VLASS, don't move anything.
#
if [[ ${PROFILE} != vlass* ]]
then
if [ -e ${SPOOL_DIR}/${SUBDIRECTORY} ]
then
# Our base directory with the outputlogs is there, move our subdirectories back
/bin/mv -f ${QA2_DIR}/${SUBDIRECTORY}/products ${SPOOL_DIR}/${SUBDIRECTORY}
/bin/mv -f ${QA2_DIR}/${SUBDIRECTORY}/rawdata ${SPOOL_DIR}/${SUBDIRECTORY}
/bin/mv -f ${QA2_DIR}/${SUBDIRECTORY}/working ${SPOOL_DIR}/${SUBDIRECTORY}
# Cleanup the QA2 area
/bin/rm -rf ${QA2_DIR}/${SUBDIRECTORY}
else
#if no old directory exists, just move the whole thing back
/bin/mv -f ${QA2_DIR}/${SUBDIRECTORY} ${SPOOL_DIR}
fi
fi