Skip to content
Snippets Groups Projects
Commit 54b6cb59 authored by Daniel Lyons's avatar Daniel Lyons
Browse files

Rewrite deliver-cipl in Python

parent 6f67c962
No related branches found
No related tags found
1 merge request!304Rewrite deliver-cipl in Python
Pipeline #1990 passed with warnings
Showing
with 565 additions and 4 deletions
......@@ -9,6 +9,7 @@
.gitlab-ci.yml
/tmp
/delivery_root
/lustre
.coverage
.coveragerc
/docs
\ No newline at end of file
/docs
......@@ -78,3 +78,4 @@ services/**/**/shared
services/**/**/testing
tmp
delivery_root
lustre/
......@@ -6,7 +6,6 @@ import argparse
import logging
import pathlib
import re
import shutil
import subprocess
import glob
......@@ -160,7 +159,7 @@ def main():
os.chdir(path)
else:
logger.info("RUNNING VELA!")
subprocess.Popen(["vela", "--standard-cal", metadata, ppr])
subprocess.run(["./vela", "--standard-cal", metadata, ppr])
else:
logger.info("TYPE ERROR: Provided SPL is not type execution block!")
sys.exit(1)
# Workspaces Standard Capability Delivery/Post QA System
Delivery system for standard calibration and standard imaging capabilities.
It is anticipated that post QA operations for these capabilities will also occur here.
"""
Conveyor is responsible for moving or copying results from standard capability requests to the DA
QA area, called "delivery", and back, called "post QA".
"""
import pathlib
import sys
from typing import Dict
from pycapo import CapoConfig
import argparse
import logging
from .deliver import DeliveryConveyor
from .retrieve import RetrievalConveyor
from .solicitor import Solicitor
logger = logging.getLogger("conveyor")
logger.setLevel(logging.INFO)
logger.addHandler(logging.StreamHandler(sys.stdout))
def arg_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(
description="Workspaces Standard Capability Postprocessing",
formatter_class=argparse.RawTextHelpFormatter,
)
parser.add_argument(
"--deliver",
nargs=1,
action="store",
required=False,
help="deliver standard type capability directories to analyst QA area",
)
parser.add_argument(
"--retrieve",
nargs=1,
action="store",
required=False,
help="return standard type capability directories to original parent directory",
)
return parser
def _get_settings(filename: str) -> Dict[str, str]:
delivery_area = (
CapoConfig().settings("edu.nrao.archive.workspaces.DeliverySettings").ciplDelivery
)
weblog_cache_area = (
CapoConfig().settings("edu.nrao.archive.workspaces.DeliverySettings").cacheWeblogDirectory
)
workspaces_lustre_root_dir = (
CapoConfig().settings("edu.nrao.archive.workspaces.ProcessingSettings").rootDirectory
)
current_root_directory = str(pathlib.Path.cwd().parent)
current_subdirectory = str(pathlib.Path.cwd().stem)
destination_dir = Solicitor(filename=filename).solicit_delivery_location()
destination_subdir = pathlib.Path(destination_dir).stem
return {
"qa_delivery_area": delivery_area,
"weblog_cache_area": weblog_cache_area,
"workspaces_lustre_root_dir": workspaces_lustre_root_dir,
"current_root_directory": current_root_directory,
"current_subdirectory": current_subdirectory,
"destination_dir": destination_dir,
"destination_subdir": destination_subdir,
}
def main():
args = arg_parser().parse_args()
action = None
conveyor = None
if args.deliver is not None:
action = "Delivery"
settings = _get_settings(args.deliver[0])
conveyor = DeliveryConveyor(settings)
elif args.retrieve is not None:
action = "Retrieval"
settings = _get_settings(args.retrieve[0])
conveyor = RetrievalConveyor(settings)
conveyor.convey()
logger.info(f"Standard Calibration {action} is complete!")
import glob
import logging
import os
import pathlib
import shutil
import sys
import tarfile
from typing import Dict
from .interfaces import ConveyorIF
"""
Deliver weblog and standard capability (i.e. standard calibration or imaging) results to QA area for analyst access
"""
vlapipe_id = 6000
class DeliveryConveyor(ConveyorIF):
def __init__(self, settings: Dict[str, str]):
self.logger = logging.getLogger("conveyor")
self.settings = settings
def delivery(self):
self.cache_weblog()
self.move_subdirectories_to_qa2()
def cache_weblog(self):
self.logger.info("Caching Weblog!")
weblog_location = (
self.settings["weblog_cache_area"] + "/" + self.settings["destination_subdir"]
)
# make new cache directory with name matching workspaces parent directory and unzip weblog into it
os.makedirs(weblog_location)
if pathlib.Path("./products/weblog.tgz").exists():
# shutil.copy("./products/weblog.tgz", weblog_location)
tar = tarfile.open("./products/weblog.tgz")
# extract to cache area
tar.extractall(path=weblog_location)
# extract in place
tar.extractall(path="./products")
tar.close()
elif self._get_pipeline_dir():
self.logger.info("WARNING! No weblog.tgz, copying pipeline directory instead.")
shutil.copytree("./products/pipeline-*", weblog_location)
else:
self.logger.info("ERROR: Neither weblog directory or tar file found! Exiting.")
sys.exit(1)
@staticmethod
def _get_pipeline_dir() -> bool:
if not glob.glob("./products/pipeline-*"):
return False
return True
def move_subdirectories_to_qa2(self):
self.logger.info("Moving subdirectories to QA area!")
qa_path = self.settings["qa_delivery_area"] + "/" + self.settings["destination_subdir"]
if not pathlib.Path(qa_path).exists():
os.makedirs(qa_path)
os.chown(qa_path, vlapipe_id, vlapipe_id)
else:
self.logger.info(f"Delivering to existing {qa_path}:")
self.logger.info(os.listdir(qa_path))
contents = glob.glob("*/", recursive=True)
# deliver all directories before creating symlinks - ensures delivery if links fail
for directory in contents:
src = "./" + directory
dest = qa_path + "/" + directory
shutil.copytree(src, dest)
# link results back to intended destination
for directory in contents:
src = qa_path + "/" + directory
os.system("chown -R 6000:6000 " + src)
self.symlink_qa(directory, src)
def symlink_qa(self, dir_name: str, src: pathlib.Path):
ws_lustre_spool = self.settings["destination_dir"] + "/" + dir_name
self.logger.info(f"Linking QA2 and workspaces parent directory for directory {dir_name}!")
pathlib.Path(ws_lustre_spool).symlink_to(src, target_is_directory=True)
self.logger.info("Symlink Created!")
def convey(self):
self.logger.info("RUNNING STANDARD CALIBRATION DELIVERY! ")
self.delivery()
"""
Interfaces for conveyor
"""
from abc import ABC
class ConveyorIF(ABC):
"""
Generic conveyor methods
"""
def convey(self):
raise NotImplementedError
import glob
import logging
import os
import shutil
from pathlib import Path
from typing import Dict, List
from .interfaces import ConveyorIF
"""
Post QA: Retrieve results directories from QA2 area to the spool area
"""
class RetrievalConveyor(ConveyorIF):
def __init__(self, settings: Dict[str, str]):
self.logger = logging.getLogger("conveyor")
self.settings = settings
def retrieval(self):
qa_path = self.determine_qa_directory()
spool_path = self.determine_spool_directory()
os.chdir(spool_path)
contents = glob.glob("*/", recursive=True)
self.break_symlinks(spool_path, contents)
self.move_subdirectories_to_spool(qa_path, spool_path, contents)
check = self.check_spool_contents(spool_path, contents)
if check:
Path(qa_path).rmdir()
def break_symlinks(self, spool_path: Path, dir_list: List[str]):
self.logger.info(f"Breaking symlinks between qa2 and spool for directory {spool_path.stem}...")
for directory in dir_list:
directory = directory.strip("/")
if Path(spool_path / directory).is_symlink():
Path(spool_path / directory).unlink()
self.logger.info(f"Symlink for {directory} removed.")
else:
self.logger.info(f"Directory {directory} is not a symlink! Skipping.")
def move_subdirectories_to_spool(
self,
qa_path: Path,
spool_path: Path,
dir_list: List[str]
):
self.logger.info(f"Moving directories from qa2 to spool for directory {qa_path.stem}...")
for directory in dir_list:
qa_dir = qa_path / directory
spool_dir = spool_path / directory
self.logger.info(f"Moving directory {directory}...")
shutil.move(qa_dir, spool_dir)
self.logger.info("Done.")
def determine_qa_directory(self) -> Path:
qa_area = self.settings["qa_delivery_area"]
wf_dir = self.settings["destination_subdir"]
return Path(qa_area + "/" + wf_dir)
def determine_spool_directory(self) -> Path:
spool_area = self.settings["workspaces_lustre_root_dir"]
wf_dir = self.settings["destination_subdir"]
return Path(spool_area + "/" + wf_dir)
def check_spool_contents(self, spool_path: Path, expected_dirs: List[str]) -> bool:
os.chdir(spool_path)
contents = glob.glob("*/", recursive=True)
if set(contents).issubset(expected_dirs) and not len(contents) < len(expected_dirs):
self.logger.info("All directories were successfully transferred!")
return True
else:
self.logger.error(f"ERROR: A directory seems to be missing! "
f"Directory list {contents} does not match the "
f"list of expected directories!")
return False
def convey(self):
self.logger.info("RUNNING POST QA STANDARD CALIBRATION DIRECTORY RETRIEVAL!")
self.retrieval()
"""
Solicitor: uses metadata.json to determine delivery locations
"""
import json
class Solicitor:
def __init__(self, filename: str):
self.filename = filename
def solicit_delivery_location(self):
with open(self.filename) as file:
metadata = json.loads(file.read())
return metadata["destinationDirectory"]
#!/usr/bin/python
# -*- coding: utf-8 -*-
from pathlib import Path
from setuptools import find_packages, setup
VERSION = open("conveyor/_version.py").readlines()[-1].split()[-1].strip("\"'")
README = Path("README.md").read_text()
setup(
name="ssa-" + Path().absolute().name,
version=VERSION,
description="NRAO Workspaces Standard Capability Delivery/Post QA System",
long_description=README,
author="NRAO SSA Team",
author_email="dms-ssa@nrao.edu",
url="TBD",
license="GPL",
install_requires=["pex==2.1.41", "pycapo==0.3.1"],
tests_require=["pytest>=5.4,<6.0", "pytest-resource-path"],
keywords=[],
packages=find_packages(),
classifiers=["Programming Language :: Python :: 3.8"],
entry_points={"console_scripts": ["conveyor = conveyor.conveyor:main"]},
)
{
"fileSetIds": "brain_000.58099.67095825232",
"workflowName": "std_calibration",
"systemId": "2",
"productLocator": "uid://evla/execblock/ec082e65-452d-4fec-ad88-f5b4af1f9e36",
"projectMetadata": {
"projectCode": "Operations",
"title": "",
"startTime": "58099.6710792824",
"observer": "VLA Operations"
},
"destinationDirectory": "/lustre/aoc/cluster/pipeline/docker/workspaces/spool/tmpabcd1234"
}
"""
Tests for conveyor.conveyor module
"""
import argparse
from unittest.mock import patch, MagicMock
import conveyor.conveyor as con
from conveyor.deliver import DeliveryConveyor
from conveyor.retrieve import RetrievalConveyor
expected_settings = {
"qa_delivery_area": "/lustre/aoc/cluster/pipeline/docker/workspaces/qa2",
"weblog_cache_area": "/lustre/aoc/cluster/pipeline/docker/workspaces/cache/weblog",
"workspaces_lustre_root_dir": "/lustre/aoc/cluster/pipeline/docker/workspaces/spool",
"current_root_directory": "current/directory",
"current_subdirectory": "dir_123",
"destination_dir": "/lustre/aoc/cluster/pipeline/docker/workspaces/spool/tmpabcd1234",
"destination_subdir": "tmpabcd1234",
}
args = argparse.Namespace()
class TestConveyor:
def test_get_settings(self):
with patch("pathlib.Path.cwd") as cwd:
settings = con._get_settings("test/test.json")
assert settings["qa_delivery_area"] == expected_settings["qa_delivery_area"]
assert settings["weblog_cache_area"] == expected_settings["weblog_cache_area"]
assert settings["workspaces_lustre_root_dir"] == expected_settings["workspaces_lustre_root_dir"]
# mock calls to cwd and count
assert cwd.call_count == 2
assert settings["destination_dir"] == expected_settings["destination_dir"]
assert settings["destination_subdir"] == expected_settings["destination_subdir"]
def test_main_deliver(self):
args.deliver = ['test/test.json']
with patch("argparse.ArgumentParser.parse_args", MagicMock(return_value=args)) as mock_args:
assert args.deliver[0] == "test/test.json"
with patch("conveyor.deliver.DeliveryConveyor.convey") as del_convey:
con.main()
assert del_convey.call_count == 1
# reset for other testing
args.deliver = None
def test_main_retrieve(self):
args.retrieve = ['test/test.json']
with patch("argparse.ArgumentParser.parse_args", MagicMock(return_value=args)) as mock_args:
with patch("conveyor.retrieve.RetrievalConveyor.convey") as ret_convey:
con.main()
assert ret_convey.call_count == 1
# reset for other testing
args.retrieve = None
"""
Tests for conveyor.deliver module
"""
from unittest.mock import patch, MagicMock
from conveyor.deliver import DeliveryConveyor
settings = {
"qa_delivery_area": "/lustre/aoc/cluster/pipeline/docker/workspaces/qa2",
"weblog_cache_area": "/lustre/aoc/cluster/pipeline/docker/workspaces/cache/weblog",
"workspaces_lustre_root_dir": "/lustre/aoc/cluster/pipeline/docker/workspaces/spool",
"current_root_directory": "current/directory",
"current_subdirectory": "dir_123",
"destination_dir": "/lustre/aoc/cluster/pipeline/docker/workspaces/spool/tmpabcd1234",
"destination_subdir": "tmpabcd1234",
}
conveyor = DeliveryConveyor(settings=settings)
class TestDeliveryConveyor:
def test_delivery(self):
with patch("conveyor.deliver.DeliveryConveyor.cache_weblog") as weblog:
with patch("conveyor.deliver.DeliveryConveyor.move_subdirectories_to_qa2") as qa2:
conveyor.delivery()
assert weblog.call_count == 1
assert qa2.call_count == 1
@patch("sys.exit")
@patch("shutil.copytree")
@patch("tarfile.open")
def test_cache_weblog(self, mock_tar, mock_shutil, mock_exit):
with patch("os.makedirs") as make_dirs:
with patch("conveyor.deliver.DeliveryConveyor._get_pipeline_dir") as pipeline:
conveyor.cache_weblog()
assert make_dirs.call_count == 1
assert mock_shutil.call_count == 1
def test_get_pipeline_dir(self):
with patch("glob.glob") as glob:
choice = conveyor._get_pipeline_dir()
assert glob.call_count == 1
assert choice is True
@patch("os.symlink")
@patch("os.system")
@patch("shutil.copytree")
@patch("os.chown")
@patch("os.makedirs")
def test_move_subdirectories_to_qa2(self, mock_dirs, mock_chown, mock_copy,
mock_system, mock_link):
with patch("glob.glob", MagicMock(return_value=["rawdata/", "working/", "products/"])) as contents:
conveyor.move_subdirectories_to_qa2()
assert mock_dirs.call_count == 1
assert mock_chown.call_count == 1
assert mock_copy.call_count == 3
assert mock_system.call_count == 3
assert mock_link.call_count == 3
@patch("os.symlink")
def symlink_qa(self, mock_link):
conveyor.symlink_qa(settings["destination_subdir"], settings["qa_delivery_area"])
assert mock_link.call_count == 1
def test_convey(self):
with patch("conveyor.deliver.DeliveryConveyor.delivery") as delivery:
conveyor.convey()
assert delivery.call_count == 1
"""
Tests for conveyor.retieve module
"""
from pathlib import Path
from unittest.mock import patch
from conveyor.retrieve import RetrievalConveyor
settings = {
"qa_delivery_area": "/lustre/aoc/cluster/pipeline/docker/workspaces/qa2",
"weblog_cache_area": "/lustre/aoc/cluster/pipeline/docker/workspaces/cache/weblog",
"workspaces_lustre_root_dir": "/lustre/aoc/cluster/pipeline/docker/workspaces/spool",
"current_root_directory": "current/directory",
"current_subdirectory": "dir_123",
"destination_dir": "/lustre/aoc/cluster/pipeline/docker/workspaces/spool/tmpabcd1234",
"destination_subdir": "tmpabcd1234",
}
conveyor = RetrievalConveyor(settings=settings)
def fake_determine_qa():
qa_area = settings["qa_delivery_area"]
wf_dir = settings["destination_subdir"]
return Path(qa_area + "/" + wf_dir)
def fake_determine_spool():
spool_area = settings["workspaces_lustre_root_dir"]
wf_dir = settings["destination_subdir"]
return Path(spool_area + "/" + wf_dir)
class TestRetrievalConveyor:
@patch("pathlib.Path.rmdir")
@patch("glob.glob")
@patch("os.chdir")
def test_retrieval(self, mock_chdir, mock_glob, mock_rm):
with patch("conveyor.retrieve.RetrievalConveyor.break_symlinks") as symlinks:
with patch("conveyor.retrieve.RetrievalConveyor.move_subdirectories_to_spool") as move:
with patch("conveyor.retrieve.RetrievalConveyor.check_spool_contents") as contents:
conveyor.retrieval()
assert mock_chdir.call_count == 1
assert mock_glob.call_count == 1
assert symlinks.call_count == 1
assert move.call_count == 1
assert contents.call_count == 1
assert mock_rm.call_count == 1
@patch("pathlib.Path.is_symlink", return_value=True)
@patch("pathlib.Path.unlink")
def test_break_symlinks(self, mock_unlink, mock_sym):
path = fake_determine_spool()
dir_list = ["working/", "rawdata/", "products/"]
conveyor.break_symlinks(path, dir_list)
assert mock_sym.call_count == 3
assert mock_unlink.call_count == 3
@patch("shutil.move")
def test_move_subdirectories_to_spool(self, mock_move):
qa_path = fake_determine_qa()
spool_path = fake_determine_spool()
dir_list = ["working/", "rawdata/", "products/"]
conveyor.move_subdirectories_to_spool(qa_path, spool_path, dir_list)
assert mock_move.call_count == 3
def test_determine_qa_directory(self):
path = conveyor.determine_qa_directory()
assert isinstance(path, Path)
def test_determine_spool_directory(self):
path = conveyor.determine_spool_directory()
assert isinstance(path, Path)
@patch("glob.glob", return_value=["working/", "rawdata/", "products/"])
@patch("os.chdir")
def test_check_spool_contents(self, mock_chdir, mock_glob):
spool = fake_determine_spool()
expected_list = ["working/", "rawdata/", "products/"]
check = conveyor.check_spool_contents(spool, expected_list)
assert check is True
@patch("glob.glob", return_value=["working/", "products/"])
@patch("os.chdir")
def test_check_spool_contents(self, mock_chdir, mock_glob):
spool = fake_determine_spool()
expected_list = ["working/", "rawdata/", "products/"]
check = conveyor.check_spool_contents(spool, expected_list)
assert check is False
def test_convey(self):
with patch("conveyor.retrieve.RetrievalConveyor.retrieval") as retrieval:
conveyor.convey()
assert retrieval.call_count == 1
"""
Tests for conveyor.solicitor module
"""
from conveyor.solicitor import Solicitor
solicitor = Solicitor("test/test.json")
expected_delivery = "/lustre/aoc/cluster/pipeline/docker/workspaces/spool/tmpabcd1234"
class TestSolicitor:
def test_solicitor(self):
location = solicitor.solicit_delivery_location()
assert location == expected_delivery
......@@ -8,7 +8,7 @@ from setuptools import find_packages, setup
VERSION = open("vela/_version.py").readlines()[-1].split()[-1].strip("\"'")
README = Path("README.md").read_text()
requires = ["pycapo", "bs4", "lxml", "pex==2.1.41"]
requires = ["pycapo", "bs4", "lxml", "pex==2.1.41", "pendulum", "argparse"]
setup(
name="ssa-" + Path().absolute().name,
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment