diff --git a/apps/cli/executables/pexable/ingest_envoy/ingest_envoy/ingest.py b/apps/cli/executables/pexable/ingest_envoy/ingest_envoy/ingest.py index 2eb12035e82f443d81cb7cad42b1e48797400343..3962883fc4b34bf8389d581e0a1a570284715551 100644 --- a/apps/cli/executables/pexable/ingest_envoy/ingest_envoy/ingest.py +++ b/apps/cli/executables/pexable/ingest_envoy/ingest_envoy/ingest.py @@ -178,6 +178,26 @@ def send_aat_image_message(parameters: dict, ingest_type: Union[IngestType, VLAS ) +def send_aat_obs_message(parameters: dict): + """ + Send AMQP message to AAT to set off observation (VLBA) indexing + + :param parameters: dictionary of available input parameters + :return: + """ + url = parameters["workflowUrl"] + f"/workflows/requests/message-archive/obs-ingestion-complete" + payload = {"project_code": parameters["project"]} + + response = requests.post(url, json=payload) + + if response.status_code != http.HTTPStatus.OK: + logger.warning( + f"WARNING: Failed to send observation ingestion complete to archive for observation associated with " + f"project {parameters['project']}." + f" Please set off the index manually!" + ) + + def _determine_ingestion_info( args: argparse.Namespace, ) -> Tuple[Union[IngestType, VLASSIngestType], dict, IngestLauncherIF]: @@ -228,6 +248,8 @@ def main(): logger.info("Ingestion finished successfully! Sending status message to archive...") if arg_type == IngestType.CAL: set_cal_status_flag(parameters) + elif arg_type == IngestType.OBS: + send_aat_obs_message(parameters) else: send_aat_image_message(parameters, arg_type) else: diff --git a/shared/workspaces/workspaces/workflow/message_architect.py b/shared/workspaces/workspaces/workflow/message_architect.py index c0d1c96338c0e7dae4bfb07b95d67441c3301cdc..faaa32a631f7300687e4e13d9aca907f8cfab408 100644 --- a/shared/workspaces/workspaces/workflow/message_architect.py +++ b/shared/workspaces/workspaces/workflow/message_architect.py @@ -120,6 +120,16 @@ archive_msg_templates = DictView( "status": "complete", }, }, + "obs_ingestion_complete": { + "application": "ingestor", + "message": "Observation Ingestion Complete", + "request": "observation", + "logData": { + "project_code": None, + "ingestion_type": "vlba", + "status": "complete", + }, + }, "download_complete": { "application": "workflow", "download_url": None, @@ -281,7 +291,7 @@ class ArchiveMessageArchitect(MessageArchitectIF): # image indexing is based on project not request id, use different subject template["subject"] = ( f"WS workflow triggering indexer for {self.project}" - if self.key == "ingestor.images" + if self.key in ["ingestor.images", "ingestion-complete.vlba"] else f"WS workflow request #{self.request.workflow_request_id}" ) template["routing_key"] = self.key diff --git a/shared/workspaces/workspaces/workflow/services/workflow_service.py b/shared/workspaces/workspaces/workflow/services/workflow_service.py index 6c1a10fba9ff890bd478dc1db26c7b34db90aaf7..c1f52d4f4489bfa393102da67a40c3697c00eb6a 100644 --- a/shared/workspaces/workspaces/workflow/services/workflow_service.py +++ b/shared/workspaces/workspaces/workflow/services/workflow_service.py @@ -318,6 +318,13 @@ class WorkflowService(WorkflowServiceIF): elif msg_type == "img-ingestion-complete": logger.info(f"Sending 'Image Ingestion Complete' indexing message to AAT for project {project_code}!") msg_arch = ArchiveMessageArchitect(routing_key="ingestor.images", request=None, project_code=project_code) + elif msg_type == "obs-ingestion-complete": + logger.info(f"Sending 'Observation Ingestion Complete' indexing message to AAT for project {project_code}!") + msg_arch = ArchiveMessageArchitect( + routing_key="ingestion-complete.vlba", + request=None, + project_code=project_code + ) else: logger.info(f"Cannot determine archive message for type {msg_type}. Skipping....")