Skip to content
Snippets Groups Projects
Commit 1c37e596 authored by Charlotte Hausman's avatar Charlotte Hausman
Browse files

Image Ingestion Workflow Part 1

parent 0a9ad61d
No related branches found
No related tags found
1 merge request!376Image Ingestion Workflow Part 1
Pipeline #2320 passed
......@@ -45,6 +45,13 @@ def arg_parser() -> argparse.ArgumentParser:
required=False,
help="deliver standard image results to analyst image QA area",
)
parser.add_argument(
"--retrieve-img",
nargs=1,
action="store",
required=False,
help="retrieve standard image results to original parent directory",
)
return parser
......@@ -91,6 +98,10 @@ def main():
action = "Image Delivery"
settings = _get_settings(args.deliver_img[0])
conveyor = DeliveryConveyor(settings, action)
elif args.retrieve_img is not None:
action = "Image Retrieval"
settings = _get_settings(args.retrieve_img[0])
conveyor = RetrievalConveyor(settings, action)
conveyor.convey()
logger.info(f"Standard {action} is complete!")
......@@ -27,22 +27,28 @@ class RetrievalConveyor(ConveyorIF):
contents = glob.glob("*/", recursive=True)
self.break_symlinks(spool_path, contents)
self.move_subdirectories_to_spool(qa_path, spool_path, contents)
check = self.check_spool_contents(spool_path, contents)
if check:
Path(qa_path).rmdir()
if qa_path.exists():
self.move_subdirectories_to_spool(qa_path, spool_path, contents)
check = self.check_spool_contents(spool_path, contents)
if check:
Path(qa_path).rmdir()
else:
self.logger.info(
f"Directories from {qa_path} have already been retrieved! Continuing..."
)
def break_symlinks(self, spool_path: Path, dir_list: List[str]):
self.logger.info(
f"Breaking symlinks between qa2 and spool for directory {spool_path.stem}..."
)
for directory in dir_list:
directory = directory.strip("/")
if Path(spool_path / directory).is_symlink():
Path(spool_path / directory).unlink()
self.logger.info(f"Symlink for {directory} removed.")
self.logger.info(f'Symlink for "{directory}" removed.')
else:
self.logger.info(f"Directory {directory} is not a symlink! Skipping.")
self.logger.info(f'Directory "{directory}" is not a symlink! Skipping.')
def move_subdirectories_to_spool(self, qa_path: Path, spool_path: Path, dir_list: List[str]):
self.logger.info(f"Moving directories from qa2 to spool for directory {qa_path.stem}...")
......@@ -54,7 +60,11 @@ class RetrievalConveyor(ConveyorIF):
self.logger.info("Done.")
def determine_qa_directory(self) -> Path:
qa_area = self.settings["qa_delivery_area"]
if "Calibration" in self.action:
qa_area = self.settings["qa_delivery_area"]
else:
qa_area = self.settings["image_qa_area"]
wf_dir = self.settings["destination_subdir"]
return Path(qa_area + "/" + wf_dir)
......@@ -79,5 +89,5 @@ class RetrievalConveyor(ConveyorIF):
return False
def convey(self):
self.logger.info("RUNNING POST QA STANDARD CALIBRATION DIRECTORY RETRIEVAL!")
self.logger.info(f"RUNNING POST QA STANDARD {self.action.upper()}!")
self.retrieval()
......@@ -45,9 +45,6 @@ class TestRetrievalConveyor:
assert mock_chdir.call_count == 1
assert mock_glob.call_count == 1
assert symlinks.call_count == 1
assert move.call_count == 1
assert contents.call_count == 1
assert mock_rm.call_count == 1
@patch("pathlib.Path.is_symlink", return_value=True)
@patch("pathlib.Path.unlink")
......
from __future__ import annotations
from pathlib import Path
"""
Adapted from shared/system to avoid including ssa-workspaces in pex
"""
class JSONSerializable:
def __json__(self, request=None) -> dict:
"""
Allows this object to be converted to JSON
:param request: this parameter is the active Pyramid request, if applicable (None otherwise)
:return: a dictionary which can be converted to JSON using json.dump
"""
pass
@classmethod
def from_json(cls, json: dict) -> any:
pass
class AbstractTextFile(JSONSerializable):
"""
Abstract text file is exactly that, an abstract concept of what a file is, to be
returned from various non-filesystem places.
"""
def __init__(self, filename: str, content: str):
self.filename, self.content = filename, content
def write_to(self, directory: Path):
(directory / self.filename).write_text(self.content)
@classmethod
def from_path(cls, path: Path) -> AbstractTextFile:
return cls(path.name, path.read_text())
......@@ -29,11 +29,6 @@ class TestIngestCalibrationLauncher:
assert mock_run.call_count == 2
assert mock_manifest.call_count == 1
@patch("subprocess.run")
def test_run_ingest(self, mock_run):
IngestCalibrationLauncher(parameters).run_ingest()
assert mock_run.call_count == 1
@pytest.mark.skip("Skip until manifest builder is complete")
@patch("ingest_envoy.ingestion_manifest.IngestionManifestBuilder.build")
@patch("subprocess.run")
......
"""add ingest_img workflow
Revision ID: dcbfdfafe16c
Revises: f2e524e1e04d
Create Date: 2021-07-27 15:38:06.960178
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = "dcbfdfafe16c"
down_revision = "f2e524e1e04d"
branch_labels = None
depends_on = None
condor_content = """executable = ingest_image.sh
arguments = metadata.json
output = ingest.out
error = ingest.err
log = condor.log
SBIN_PATH = /lustre/aoc/cluster/pipeline/$ENV(CAPO_PROFILE)/workspaces/sbin
should_transfer_files = yes
transfer_input_files = $(SBIN_PATH)/pycapo, $(SBIN_PATH)/conveyor, $(SBIN_PATH)/ingest_envoy, $(SBIN_PATH)/ingest, $(SBIN_PATH)/image-product-collector.sh, ./metadata.json
getenv = True
environment = "CAPO_PATH=/home/casa/capo"
queue
"""
script_content = """#!/bin/sh
set -o errexit
./conveyor --retrieve-img $1
./ingest_envoy --image $1
"""
def upgrade():
op.execute(
f"""
INSERT INTO workflows (workflow_name)
VALUES ('ingest_image')
"""
)
op.execute(
f"""
INSERT INTO workflow_templates (filename, content, workflow_name)
VALUES ('ingest_image.condor', E'{condor_content}', 'ingest_image')
"""
)
op.execute(
f"""
INSERT INTO workflow_templates (filename, content, workflow_name)
VALUES ('ingest_image.sh', E'{script_content}', 'ingest_image')
"""
)
def downgrade():
op.execute(
"""
DELETE FROM workflows WHERE workflow_name='ingest_image'
"""
)
op.execute(
"""
DELETE FROM workflow_templates WHERE workflow_name='ingest_image'
"""
)
......@@ -34,9 +34,7 @@ mkdir -p "$STAGING_DIR"
mkdir -p "$STORAGE_DIR"
# Copy wf_framework shell scripts to workflow dir
cp /packages/apps/cli/executables/wf_framework/sh/framework.sh "$WORKFLOW_DIR"
cp /packages/apps/cli/executables/wf_framework/sh/calibration-table-collector.sh "$WORKFLOW_DIR"
cp /packages/apps/cli/executables/wf_framework/sh/ingest-request.sh "$WORKFLOW_DIR"
cp -a /packages/apps/cli/executables/wf_framework/sh/. "$WORKFLOW_DIR"
cp -R /packages/apps/cli/executables/wf_framework/casa_requirements/.matplotlib "$WORKFLOW_DIR"
......
......@@ -251,7 +251,9 @@ class WorkflowRequestRestService:
file = lookup_file(request=self.request)
# 2. create ingestion workflow request
ingest_type = "ingest_cal" if "calibration" in self.request.matchdict["name"] else "ingest"
ingest_type = (
"ingest_cal" if "calibration" in self.request.matchdict["name"] else "ingest_image"
)
ingest_request = self.request.info.create_workflow_request(
workflow=ingest_type,
argument={"parent_wf_request_id": self.request.matchdict["request_id"]},
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment