From c5321622cfea1f764f8b517e979b1221a05fb6b0 Mon Sep 17 00:00:00 2001 From: chausman <chausman@nrao.edu> Date: Mon, 12 Sep 2022 14:04:37 -0600 Subject: [PATCH] template fixes and messaging --- ...d6938d2307_add_vlass_calibration_templates.py | 12 +++++++++--- shared/workspaces/workspaces/workflow/enum.py | 1 + .../workspaces/workflow/message_architect.py | 10 ++++++++++ .../workflow/services/workflow_service.py | 16 ++++++++++++++++ 4 files changed, 36 insertions(+), 3 deletions(-) diff --git a/shared/workspaces/alembic/versions/5dd6938d2307_add_vlass_calibration_templates.py b/shared/workspaces/alembic/versions/5dd6938d2307_add_vlass_calibration_templates.py index d4f99fdb9..5ff5a9475 100644 --- a/shared/workspaces/alembic/versions/5dd6938d2307_add_vlass_calibration_templates.py +++ b/shared/workspaces/alembic/versions/5dd6938d2307_add_vlass_calibration_templates.py @@ -6,7 +6,6 @@ Create Date: 2022-09-12 10:01:22.749122 """ from alembic import op -import sqlalchemy as sa # revision identifiers, used by Alembic. @@ -75,7 +74,7 @@ log = condor.log VLASS_DIR = {{data_location}} SBIN_PATH = /lustre/aoc/cluster/pipeline/$ENV(CAPO_PROFILE)/workspaces/sbin should_transfer_files = yes -transfer_input_files = $ENV(HOME)/.ssh/condor_transfer, nraorsync://$(SBIN_PATH)/.matplotlib, nraorsync://$(SBIN_PATH)/pycapo, nraorsync://$(SBIN_PATH)/vela, nraorsync://$(SBIN_PATH)/casa_envoy, nraorsync://$(VLASS_DIR)/working, nraorsync://$(VLASS_DIR)/rawdata, nraorsync://$(VLASS_DIR)/products, nraorsync://$(VLASS_DIR)/{{profile}}.properties, nraorsync://$(VLASS_DIR)/PPR.xml, nraorsync://$(VLASS_DIR)/metadata.json +transfer_input_files = $ENV(HOME)/.ssh/condor_transfer, nraorsync://$(SBIN_PATH)/.matplotlib, nraorsync://$(SBIN_PATH)/pycapo, nraorsync://$(SBIN_PATH)/vela, nraorsync://$(SBIN_PATH)/casa_envoy, nraorsync://$(VLASS_DIR)/working, nraorsync://$(VLASS_DIR)/rawdata, nraorsync://$(VLASS_DIR)/products{{#remote}}, nraorsync://$(VLASS_DIR)/{{profile}}.properties{{/remote}}, nraorsync://$(VLASS_DIR)/PPR.xml, nraorsync://$(VLASS_DIR)/metadata.json when_to_transfer_output = ON_EXIT transfer_output_files = .job.ad +nrao_output_files = "working products" @@ -84,8 +83,11 @@ output_destination = nraorsync://$(VLASS_DIR) request_memory = 80G getenv = True -environment = "CAPO_PATH=." +{{^remote}} +environment = "CAPO_PATH=/home/casa/capo" +requirements = HasLustre == True +{{/remote}} {{#remote}} requirements = (VLASS == True) +partition = "VLASS" @@ -99,6 +101,10 @@ queue envoy_script = """#!/bin/sh export HOME=$TMPDIR +{{#remote}} +export CAPO_PATH=. +export CAPO_PROFILE={{profile}} +{{/remote}} set -o errexit diff --git a/shared/workspaces/workspaces/workflow/enum.py b/shared/workspaces/workspaces/workflow/enum.py index 02eb85f0c..d60b575d6 100644 --- a/shared/workspaces/workspaces/workflow/enum.py +++ b/shared/workspaces/workspaces/workflow/enum.py @@ -44,6 +44,7 @@ class ArchiveWorkflows(Enum): SECI = "vlass_seci" CUBE = "vlass_coarse" QUICKLOOK = "vlass_quicklook" + VLASS_CAL = "vlass_calibration" INGEST_SECI = "ingest_seci" @classmethod diff --git a/shared/workspaces/workspaces/workflow/message_architect.py b/shared/workspaces/workspaces/workflow/message_architect.py index 15ba7334c..289e2a421 100644 --- a/shared/workspaces/workspaces/workflow/message_architect.py +++ b/shared/workspaces/workspaces/workflow/message_architect.py @@ -138,6 +138,16 @@ archive_msg_templates = DictView( "message": "quicklook complete", "status": "complete", }, + "vlass_cal_failed": { + "application": "workflow", + "message": "calibration failed", + "status": "failed", + }, + "vlass_cal_complete": { + "application": "workflow", + "message": "calibration complete", + "status": "complete", + }, "do_not_calibrate": { "application": "qa-script", "logData": { diff --git a/shared/workspaces/workspaces/workflow/services/workflow_service.py b/shared/workspaces/workspaces/workflow/services/workflow_service.py index b013bee9b..8711b765b 100644 --- a/shared/workspaces/workspaces/workflow/services/workflow_service.py +++ b/shared/workspaces/workspaces/workflow/services/workflow_service.py @@ -970,6 +970,14 @@ class WorkflowMessageHandler: ql_msg = ArchiveMessageArchitect(routing_key=routing_key, request=wf_request).compose_message("ql_failed") self.archive_messenger.send_message(**ql_msg) + if wf_request.workflow_name == ArchiveWorkflows.VLASS_CAL.value: + logger.debug(f"Sending failed calibration message to VLASS Manager for request #{wf_id}") + routing_key = f"ws-workflow.calibration.{wf_id}" + cal_msg = ArchiveMessageArchitect(routing_key=routing_key, request=wf_request).compose_message( + "vlass_cal_failed" + ) + self.archive_messenger.send_message(**cal_msg) + def send_archive_complete_event(self, **message: Dict): subject = message["subject"] wf_id = subject["workflow_request_id"] @@ -997,6 +1005,14 @@ class WorkflowMessageHandler: ql_msg = ArchiveMessageArchitect(routing_key=routing_key, request=wf_request).compose_message("ql_complete") self.archive_messenger.send_message(**ql_msg) + if wf_request.workflow_name == ArchiveWorkflows.QUICKLOOK.value: + logger.debug(f"Sending calibration message to VLASS Manager for request #{wf_id}") + routing_key = f"ws-workflow.calibration.{wf_id}" + cal_msg = ArchiveMessageArchitect(routing_key=routing_key, request=wf_request).compose_message( + "vlass_cal_complete" + ) + self.archive_messenger.send_message(**cal_msg) + @staticmethod def clean_remote_workflow(request: WorkflowRequest): injector = CapoInjector(request) -- GitLab