Skip to content
Snippets Groups Projects
Commit fdb7a822 authored by Charlotte Hausman's avatar Charlotte Hausman
Browse files

Image Ingestion workflow

- fix conveyor to allow multiple ingestion attempts
- fix workflow endpoint for image ingestion
- create ingest-image workflow
parent 0a9ad61d
No related branches found
No related tags found
2 merge requests!378Image Ingestion workflow,!376Image Ingestion Workflow Part 1
Pipeline #2316 failed
...@@ -45,6 +45,13 @@ def arg_parser() -> argparse.ArgumentParser: ...@@ -45,6 +45,13 @@ def arg_parser() -> argparse.ArgumentParser:
required=False, required=False,
help="deliver standard image results to analyst image QA area", help="deliver standard image results to analyst image QA area",
) )
parser.add_argument(
"--retrieve-img",
nargs=1,
action="store",
required=False,
help="retrieve standard image results to original parent directory",
)
return parser return parser
...@@ -91,6 +98,10 @@ def main(): ...@@ -91,6 +98,10 @@ def main():
action = "Image Delivery" action = "Image Delivery"
settings = _get_settings(args.deliver_img[0]) settings = _get_settings(args.deliver_img[0])
conveyor = DeliveryConveyor(settings, action) conveyor = DeliveryConveyor(settings, action)
elif args.retrieve_img is not None:
action = "Image Retrieval"
settings = _get_settings(args.retrieve_img[0])
conveyor = RetrievalConveyor(settings, action)
conveyor.convey() conveyor.convey()
logger.info(f"Standard {action} is complete!") logger.info(f"Standard {action} is complete!")
...@@ -27,22 +27,28 @@ class RetrievalConveyor(ConveyorIF): ...@@ -27,22 +27,28 @@ class RetrievalConveyor(ConveyorIF):
contents = glob.glob("*/", recursive=True) contents = glob.glob("*/", recursive=True)
self.break_symlinks(spool_path, contents) self.break_symlinks(spool_path, contents)
self.move_subdirectories_to_spool(qa_path, spool_path, contents) if qa_path.exists():
check = self.check_spool_contents(spool_path, contents) self.move_subdirectories_to_spool(qa_path, spool_path, contents)
if check: check = self.check_spool_contents(spool_path, contents)
Path(qa_path).rmdir() if check:
Path(qa_path).rmdir()
else:
self.logger.info(
f"Directories from {qa_path} have already been retrieved! Continuing..."
)
def break_symlinks(self, spool_path: Path, dir_list: List[str]): def break_symlinks(self, spool_path: Path, dir_list: List[str]):
self.logger.info( self.logger.info(
f"Breaking symlinks between qa2 and spool for directory {spool_path.stem}..." f"Breaking symlinks between qa2 and spool for directory {spool_path.stem}..."
) )
for directory in dir_list: for directory in dir_list:
directory = directory.strip("/") directory = directory.strip("/")
if Path(spool_path / directory).is_symlink(): if Path(spool_path / directory).is_symlink():
Path(spool_path / directory).unlink() Path(spool_path / directory).unlink()
self.logger.info(f"Symlink for {directory} removed.") self.logger.info(f'Symlink for "{directory}" removed.')
else: else:
self.logger.info(f"Directory {directory} is not a symlink! Skipping.") self.logger.info(f'Directory "{directory}" is not a symlink! Skipping.')
def move_subdirectories_to_spool(self, qa_path: Path, spool_path: Path, dir_list: List[str]): def move_subdirectories_to_spool(self, qa_path: Path, spool_path: Path, dir_list: List[str]):
self.logger.info(f"Moving directories from qa2 to spool for directory {qa_path.stem}...") self.logger.info(f"Moving directories from qa2 to spool for directory {qa_path.stem}...")
...@@ -54,7 +60,11 @@ class RetrievalConveyor(ConveyorIF): ...@@ -54,7 +60,11 @@ class RetrievalConveyor(ConveyorIF):
self.logger.info("Done.") self.logger.info("Done.")
def determine_qa_directory(self) -> Path: def determine_qa_directory(self) -> Path:
qa_area = self.settings["qa_delivery_area"] if "Calibration" in self.action:
qa_area = self.settings["qa_delivery_area"]
else:
qa_area = self.settings["image_qa_area"]
wf_dir = self.settings["destination_subdir"] wf_dir = self.settings["destination_subdir"]
return Path(qa_area + "/" + wf_dir) return Path(qa_area + "/" + wf_dir)
...@@ -79,5 +89,5 @@ class RetrievalConveyor(ConveyorIF): ...@@ -79,5 +89,5 @@ class RetrievalConveyor(ConveyorIF):
return False return False
def convey(self): def convey(self):
self.logger.info("RUNNING POST QA STANDARD CALIBRATION DIRECTORY RETRIEVAL!") self.logger.info(f"RUNNING POST QA STANDARD {self.action.upper()}!")
self.retrieval() self.retrieval()
"""add ingest_img workflow
Revision ID: dcbfdfafe16c
Revises: f2e524e1e04d
Create Date: 2021-07-27 15:38:06.960178
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = "dcbfdfafe16c"
down_revision = "f2e524e1e04d"
branch_labels = None
depends_on = None
condor_content = """executable = ingest_image.sh
arguments = metadata.json
output = ingest.out
error = ingest.err
log = condor.log
SBIN_PATH = /lustre/aoc/cluster/pipeline/$ENV(CAPO_PROFILE)/workspaces/sbin
should_transfer_files = yes
transfer_input_files = $(SBIN_PATH)/pycapo, $(SBIN_PATH)/conveyor, $(SBIN_PATH)/ingest_envoy, $(SBIN_PATH)/ingest, $(SBIN_PATH)/image-product-collector.sh, ./metadata.json
getenv = True
environment = "CAPO_PATH=/home/casa/capo"
queue
"""
script_content = """#!/bin/sh
set -o errexit
./conveyor --retrieve-img $1
./ingest_envoy --image $1
"""
def upgrade():
op.execute(
f"""
INSERT INTO workflows (workflow_name)
VALUES ('ingest_image')
"""
)
op.execute(
f"""
INSERT INTO workflow_templates (filename, content, workflow_name)
VALUES ('ingest_image.condor', E'{condor_content}', 'ingest_image')
"""
)
op.execute(
f"""
INSERT INTO workflow_templates (filename, content, workflow_name)
VALUES ('ingest_image.sh', E'{script_content}', 'ingest_image')
"""
)
def downgrade():
op.execute(
"""
DELETE FROM workflows WHERE workflow_name='ingest_image'
"""
)
op.execute(
"""
DELETE FROM workflow_templates WHERE workflow_name='ingest_image'
"""
)
...@@ -34,9 +34,7 @@ mkdir -p "$STAGING_DIR" ...@@ -34,9 +34,7 @@ mkdir -p "$STAGING_DIR"
mkdir -p "$STORAGE_DIR" mkdir -p "$STORAGE_DIR"
# Copy wf_framework shell scripts to workflow dir # Copy wf_framework shell scripts to workflow dir
cp /packages/apps/cli/executables/wf_framework/sh/framework.sh "$WORKFLOW_DIR" cp -a /packages/apps/cli/executables/wf_framework/sh/. "$WORKFLOW_DIR"
cp /packages/apps/cli/executables/wf_framework/sh/calibration-table-collector.sh "$WORKFLOW_DIR"
cp /packages/apps/cli/executables/wf_framework/sh/ingest-request.sh "$WORKFLOW_DIR"
cp -R /packages/apps/cli/executables/wf_framework/casa_requirements/.matplotlib "$WORKFLOW_DIR" cp -R /packages/apps/cli/executables/wf_framework/casa_requirements/.matplotlib "$WORKFLOW_DIR"
......
...@@ -251,7 +251,9 @@ class WorkflowRequestRestService: ...@@ -251,7 +251,9 @@ class WorkflowRequestRestService:
file = lookup_file(request=self.request) file = lookup_file(request=self.request)
# 2. create ingestion workflow request # 2. create ingestion workflow request
ingest_type = "ingest_cal" if "calibration" in self.request.matchdict["name"] else "ingest" ingest_type = (
"ingest_cal" if "calibration" in self.request.matchdict["name"] else "ingest_image"
)
ingest_request = self.request.info.create_workflow_request( ingest_request = self.request.info.create_workflow_request(
workflow=ingest_type, workflow=ingest_type,
argument={"parent_wf_request_id": self.request.matchdict["request_id"]}, argument={"parent_wf_request_id": self.request.matchdict["request_id"]},
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment