From 9485d67d7acf1a3db762bc900408a8dd1434cc1f Mon Sep 17 00:00:00 2001 From: chausman <chausman@nrao.edu> Date: Wed, 5 Oct 2022 13:01:14 -0600 Subject: [PATCH] add image caching message handeling --- shared/workspaces/workspaces/workflow/enum.py | 1 + .../workspaces/workflow/message_architect.py | 10 ++++++++++ .../workflow/services/workflow_service.py | 20 +++++++++++++++++-- 3 files changed, 29 insertions(+), 2 deletions(-) diff --git a/shared/workspaces/workspaces/workflow/enum.py b/shared/workspaces/workspaces/workflow/enum.py index f3dd71de2..84b7b745d 100644 --- a/shared/workspaces/workspaces/workflow/enum.py +++ b/shared/workspaces/workspaces/workflow/enum.py @@ -46,6 +46,7 @@ class ArchiveWorkflows(Enum): QUICKLOOK = "vlass_quicklook" VLASS_CAL = "vlass_calibration" INGEST_SECI = "ingest_seci" + IMG_CACHE = "vlass_image_caching" @classmethod def values(cls): diff --git a/shared/workspaces/workspaces/workflow/message_architect.py b/shared/workspaces/workspaces/workflow/message_architect.py index 289e2a421..db4266f4a 100644 --- a/shared/workspaces/workspaces/workflow/message_architect.py +++ b/shared/workspaces/workspaces/workflow/message_architect.py @@ -148,6 +148,16 @@ archive_msg_templates = DictView( "message": "calibration complete", "status": "complete", }, + "vlass_caching_failed": { + "application": "workflow", + "message": "image caching failed", + "status": "failed", + }, + "vlass_caching_complete": { + "application": "workflow", + "message": "image caching complete", + "status": "complete", + }, "do_not_calibrate": { "application": "qa-script", "logData": { diff --git a/shared/workspaces/workspaces/workflow/services/workflow_service.py b/shared/workspaces/workspaces/workflow/services/workflow_service.py index e7318bd5c..f4ebaae71 100644 --- a/shared/workspaces/workspaces/workflow/services/workflow_service.py +++ b/shared/workspaces/workspaces/workflow/services/workflow_service.py @@ -979,6 +979,14 @@ class WorkflowMessageHandler: ) self.archive_messenger.send_message(**cal_msg) + if wf_request.workflow_name == ArchiveWorkflows.IMG_CACHE.value: + logger.debug(f"Sending failed image caching message to VLASS Manager for request #{wf_id}") + routing_key = f"ws-workflow.caching.{wf_id}" + caching_msg = ArchiveMessageArchitect(routing_key=routing_key, request=wf_request).compose_message( + "vlass_caching_failed" + ) + self.archive_messenger.send_message(**caching_msg) + def send_archive_complete_event(self, **message: Dict): subject = message["subject"] wf_id = subject["workflow_request_id"] @@ -1001,14 +1009,22 @@ class WorkflowMessageHandler: self.archive_messenger.send_message(**cube_msg) if wf_request.workflow_name == ArchiveWorkflows.QUICKLOOK.value: - logger.debug(f"Sending quicklook message to VLASS Manager for request #{wf_id}") + logger.debug(f"Sending quicklook message complete to VLASS Manager for request #{wf_id}") routing_key = f"ws-workflow.quicklook.{wf_id}" ql_msg = ArchiveMessageArchitect(routing_key=routing_key, request=wf_request).compose_message("ql_complete") self.archive_messenger.send_message(**ql_msg) if wf_request.workflow_name == ArchiveWorkflows.VLASS_CAL.value: - logger.debug(f"Sending calibration message to VLASS Manager for request #{wf_id}") + logger.debug(f"Sending calibration message complete to VLASS Manager for request #{wf_id}") routing_key = f"ws-workflow.calibration.{wf_id}" + cache_msg = ArchiveMessageArchitect(routing_key=routing_key, request=wf_request).compose_message( + "vlass_caching_complete" + ) + self.archive_messenger.send_message(**cache_msg) + + if wf_request.workflow_name == ArchiveWorkflows.IMG_CACHE.value: + logger.debug(f"Sending image caching message complete to VLASS Manager for request #{wf_id}") + routing_key = f"ws-workflow.caching.{wf_id}" cal_msg = ArchiveMessageArchitect(routing_key=routing_key, request=wf_request).compose_message( "vlass_cal_complete" ) -- GitLab