From c3a6a35ae4cd63fb11cbba9e19f0c08420926b5e Mon Sep 17 00:00:00 2001 From: James Sheckard <jsheckar@nrao.edu> Date: Thu, 30 Jun 2022 11:05:35 -0400 Subject: [PATCH] WS-1082: register cube workflow --- shared/workspaces/workspaces/system/enums.py | 1 + shared/workspaces/workspaces/workflow/enum.py | 1 + .../workflow/services/workflow_service.py | 16 ++++++++++++++++ 3 files changed, 18 insertions(+) diff --git a/shared/workspaces/workspaces/system/enums.py b/shared/workspaces/workspaces/system/enums.py index 5ce5ca60b..881f08fe6 100644 --- a/shared/workspaces/workspaces/system/enums.py +++ b/shared/workspaces/workspaces/system/enums.py @@ -43,6 +43,7 @@ class RemoteWorkflows(Enum): """ SECI = "vlass_seci" + CUBE = "vlass_coarse" DOWNLOAD = "test_download" # listed as remote due to ALMA downloads running in CV, but still requires lustre access CARTA = "carta" STD_CALIBRATION = "std_calibration" diff --git a/shared/workspaces/workspaces/workflow/enum.py b/shared/workspaces/workspaces/workflow/enum.py index 748c649b2..7128ccae1 100644 --- a/shared/workspaces/workspaces/workflow/enum.py +++ b/shared/workspaces/workspaces/workflow/enum.py @@ -42,6 +42,7 @@ class ArchiveWorkflows(Enum): CARTA = "carta" SECI = "vlass_seci" + CUBE = "vlass_coarse" INGEST_SECI = "ingest_seci" @classmethod diff --git a/shared/workspaces/workspaces/workflow/services/workflow_service.py b/shared/workspaces/workspaces/workflow/services/workflow_service.py index cc50db3e8..d40dd40ba 100644 --- a/shared/workspaces/workspaces/workflow/services/workflow_service.py +++ b/shared/workspaces/workspaces/workflow/services/workflow_service.py @@ -958,6 +958,14 @@ class WorkflowMessageHandler: ) self.archive_messenger.send_message(**seci_msg) + if wf_request.workflow_name == ArchiveWorkflows.CUBE.value: + logger.info(f"SENDING FAILED COARSE CUBE MESSAGE to VLASS Manager for request #{wf_id}!") + routing_key = f"ws-workflow.coarse_cube.{wf_id}" + cube_msg = ArchiveMessageArchitect(routing_key=routing_key, request=wf_request).compose_message( + "coarse_cube_failed" + ) + self.archive_messenger.send_message(**cube_msg) + def send_archive_complete_event(self, **message: Dict): subject = message["subject"] wf_id = subject["workflow_request_id"] @@ -971,6 +979,14 @@ class WorkflowMessageHandler: ) self.archive_messenger.send_message(**seci_msg) + if wf_request.workflow_name == ArchiveWorkflows.CUBE.value: + logger.info(f"SENDING COARSE CUBE COMPLETE MESSAGE to VLASS Manager for request #{wf_id}!") + routing_key = f"ws-workflow.coarse_cube.{wf_id}" + cube_msg = ArchiveMessageArchitect(routing_key=routing_key, request=wf_request).compose_message( + "coarse_cube_complete" + ) + self.archive_messenger.send_message(**cube_msg) + @staticmethod def clean_remote_workflow(request: WorkflowRequest): injector = CapoInjector(request) -- GitLab