Skip to content
Snippets Groups Projects

integrate curator with envoy for workflow

Merged Charlotte Hausman requested to merge curator-integration into 2.8.2.1-DEVELOMENT
1 unresolved thread
Files
18
@@ -24,14 +24,15 @@ from distutils.util import strtobool
from typing import Tuple, Union
import requests
from ingest_envoy.interfaces import IngestLauncherIF
from ingest_envoy.interfaces import LauncherIF
from ingest_envoy.launchers import (
IngestCalibrationLauncher,
IngestImageLauncher,
IngestObservationLauncher,
CuratorLauncher,
)
from ingest_envoy.solicitor import Solicitor
from ingest_envoy.utilities import IngestType, VLASSIngestType
from ingest_envoy.utilities import IngestType, VLASSIngestType, CuratorType
from pycapo import CapoConfig
"""
@@ -44,7 +45,10 @@ logger.addHandler(logging.StreamHandler(sys.stdout))
def _get_settings(
arg_type: Union[IngestType, VLASSIngestType], filename: str = None, cal_spl: str = None, source_dir: str = None
arg_type: Union[IngestType, VLASSIngestType, CuratorType],
filename: str = None,
cal_spl: str = None,
source_dir: str = None,
) -> dict:
"""
Determine needed settings from metadata.json file
@@ -86,34 +90,57 @@ def arg_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(
description="Workspaces Ingestion System", formatter_class=argparse.RawTextHelpFormatter
)
parser.add_argument(
# ensure we can't try running all types of ingestion at the same time
group = parser.add_mutually_exclusive_group()
group.add_argument(
"--calibration",
nargs=1,
action="store",
required=False,
help="run ingestion for a calibration product",
)
parser.add_argument(
group.add_argument(
"--image",
nargs=1,
action="store",
required=False,
help="run ingestion for an image product",
)
parser.add_argument(
group.add_argument(
"--observation",
nargs=2,
action="store",
required=False,
help="run ingestion for an observation",
)
parser.add_argument(
group.add_argument(
"--seci",
nargs=2,
action="store",
required=False,
help="run ingestion for VLASS SECI image products",
)
# Add subparser for curator commands
subparsers = parser.add_subparsers(
title="Curator Commands",
description="Commands for setting curation types",
help="Options for Curator",
)
curate_parser = subparsers.add_parser("curate")
curate_group = curate_parser.add_mutually_exclusive_group(required="True")
curate_group.add_argument(
"--partial",
nargs=1,
action="store",
help="run partial curation for a product",
)
curate_group.add_argument(
"--full",
nargs=1,
action="store",
help="run full curation for a product",
)
return parser
@@ -125,84 +152,84 @@ def check_ingest_type(args_type: Union[IngestType, VLASSIngestType], parameters:
:param parameters: the parameters determined from metadata.json
:return: boolean representing if the requested ingestion type matches the product type
"""
if "workflowName" not in parameters and args_type == IngestType.OBS:
if "workflowName" not in parameters and (args_type == IngestType.OBS or isinstance(args_type, CuratorType)):
return True
wf_name = parameters["workflowName"]
if args_type.value in wf_name:
return True
elif args_type == IngestType.IMG and "imaging" in wf_name:
if args_type.value in wf_name or (args_type == IngestType.IMG and "imaging" in wf_name):
return True
return False
def set_cal_status_flag(parameters: dict):
"""
Send AMQP message to AAT to set calibration status on EB.
:param parameters: dictionary of available input parameters
:return:
"""
request_id = parameters["requestId"]
url = parameters["workflowUrl"] + f"/workflows/requests/message-archive/cal-ingestion-complete"
response = requests.post(url, json={"request_id": request_id, "project_code": parameters["project"]})
if response.status_code != http.HTTPStatus.OK:
logger.warning(
f"WARNING: Failed to set calibration status flag in archive for request #{request_id}."
f" Please set the flag manually!"
)
def send_aat_image_message(parameters: dict, ingest_type: Union[IngestType, VLASSIngestType]):
"""
Send AMQP message to AAT to set off image indexing
:param parameters: dictionary of available input parameters
:param ingest_type: type of ingestion that was run
:return:
"""
url = parameters["workflowUrl"] + f"/workflows/requests/message-archive/img-ingestion-complete"
if isinstance(ingest_type, VLASSIngestType):
payload = {"project_code": parameters["project"]}
else:
payload = {"request_id": parameters["requestId"], "project_code": parameters["project"]}
response = requests.post(url, json=payload)
if response.status_code != http.HTTPStatus.OK:
logger.warning(
f"WARNING: Failed to send image ingestion complete to archive for image associated with "
f"calibration {parameters['calSpl']} and project {parameters['project']}."
f" Please set off the index manually!"
)
def send_aat_obs_message(parameters: dict):
def send_indexer_message(parameters: dict, ingest_type: Union[IngestType, VLASSIngestType, CuratorType]):
"""
Send AMQP message to AAT to set off observation (VLBA) indexing
Send AMQP message to AAT to set off indexing
:param parameters: dictionary of available input parameters
:param ingest_type: type of ingestion or curation that was run
:return:
"""
url = parameters["workflowUrl"] + f"/workflows/requests/message-archive/obs-ingestion-complete"
payload = {"project_code": parameters["project"]}
msg_suffix = "-ingestion-complete"
match ingest_type:
case IngestType.CAL:
msg = "cal" + msg_suffix
request_id = parameters["requestId"]
payload = {"request_id": request_id, "project_code": parameters["project"]}
warn_log = (
f"Failed to send calibration ingestion complete to archive for setting calibration status "
f"flag and indexing of request #{request_id}."
)
case IngestType.IMG:
msg = "img" + msg_suffix
payload = {"request_id": parameters["requestId"], "project_code": parameters["project"]}
warn_log = (
f"Failed to send image ingestion complete to archive for image associated with "
f"calibration {parameters['calSpl']} and project {parameters['project']}."
)
case IngestType.OBS:
msg = "obs" + msg_suffix
payload = {
"project_code": parameters["project"],
"telescope": parameters["telescope"],
}
warn_log = (
f"Failed to send observation ingestion complete to archive for observation {parameters['filename']}."
)
case VLASSIngestType.SECI | VLASSIngestType.QUICKLOOK | VLASSIngestType.CUBE:
msg = "img" + msg_suffix
payload = {"project_code": parameters["project"]}
warn_log = (
f"Failed to send image ingestion complete to archive for image(s) associated with "
f"calibration {parameters['calSpl']} and project {parameters['project']}."
)
case CuratorType.FULL | CuratorType.PARTIAL:
# TODO: figure out how to handle this with non-EB products
msg = "obs" + msg_suffix
telescope = parameters["telescope"]
payload = {
"project_code": parameters["project"],
"telescope": telescope,
}
warn_log = f"Failed to send {parameters['product_type']} curation complete to archive."
case _:
msg = ""
payload = {}
warn_log = ""
logger.error(f"Error: Unknown ingest type {ingest_type}, can't send index message to AAT.")
url = parameters["workflowUrl"] + f"/workflows/requests/message-archive/{msg}"
response = requests.post(url, json=payload)
if response.status_code != http.HTTPStatus.OK:
logger.warning(
f"WARNING: Failed to send observation ingestion complete to archive for observation associated with "
f"project {parameters['project']}."
f" Please set off the index manually!"
)
logger.warning(f"WARNING: {warn_log} Please set off the indexer manually")
def _determine_ingestion_info(
args: argparse.Namespace,
) -> Tuple[Union[IngestType, VLASSIngestType], dict, IngestLauncherIF]:
) -> Tuple[Union[IngestType, VLASSIngestType, CuratorType], dict, LauncherIF]:
"""
Determine parameters and launcher needed for ingestion
@@ -216,7 +243,7 @@ def _determine_ingestion_info(
if args.calibration is not None:
arg_type = IngestType.CAL
parameters = _get_settings(arg_type, filename=args.calibration[0])
launcher = IngestCalibrationLauncher(parameters)
launcher = IngestCalibrationLauncher(arg_type, parameters)
elif args.image is not None:
arg_type = IngestType.IMG
parameters = _get_settings(arg_type, filename=args.image[0])
@@ -225,10 +252,18 @@ def _determine_ingestion_info(
arg_type = VLASSIngestType.SECI
parameters = _get_settings(arg_type, cal_spl=args.seci[0], source_dir=args.seci[1])
launcher = IngestImageLauncher(arg_type, parameters)
if args.observation is not None:
elif args.observation is not None:
arg_type = IngestType.OBS
parameters = _get_settings(arg_type, filename=args.observation[0], source_dir=args.observation[1])
launcher = IngestObservationLauncher(parameters)
launcher = IngestObservationLauncher(arg_type, parameters)
elif args.partial is not None:
arg_type = CuratorType.PARTIAL
parameters = _get_settings(arg_type, filename=args.partial[0])
launcher = CuratorLauncher(arg_type, parameters)
elif args.full is not None:
arg_type = CuratorType.FULL
parameters = _get_settings(arg_type, filename=args.full[0])
launcher = CuratorLauncher(arg_type, parameters)
return arg_type, parameters, launcher
@@ -244,22 +279,20 @@ def main():
arg_type, parameters, launcher = _determine_ingestion_info(args)
if check_ingest_type(args_type=arg_type, parameters=parameters):
result = launcher.launch_ingestion()
result = launcher.launch()
if result == 0:
# Ingestion succeeded
logger.info("Ingestion finished successfully! Sending status message to archive...")
if arg_type == IngestType.CAL:
set_cal_status_flag(parameters)
elif arg_type == IngestType.OBS:
send_aat_obs_message(parameters)
else:
send_aat_image_message(parameters, arg_type)
logger.info(
f"{'Curation' if isinstance(arg_type, CuratorType) else 'Ingestion'} finished successfully! "
f"Sending status message to archive..."
)
send_indexer_message(parameters, arg_type)
else:
logger.error(f"ERROR: {arg_type} ingestion failure! Please check logs")
logger.error(f"ERROR: {arg_type} failure! Please check logs")
sys.exit(1)
else:
logger.error(
f"ERROR: The workflow request to be ingested does not match specified "
f"ERROR: The workflow request to be ingested or curated does not match specified "
f"ingestion type {arg_type}. Exiting...."
)
sys.exit(1)
Loading