Skip to content
Snippets Groups Projects
Commit c5321622 authored by Charlotte Hausman's avatar Charlotte Hausman
Browse files

template fixes and messaging

parent 7eb4813b
No related branches found
No related tags found
1 merge request!1064initial vlass calibration templates
Pipeline #6638 canceled
...@@ -6,7 +6,6 @@ Create Date: 2022-09-12 10:01:22.749122 ...@@ -6,7 +6,6 @@ Create Date: 2022-09-12 10:01:22.749122
""" """
from alembic import op from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic. # revision identifiers, used by Alembic.
...@@ -75,7 +74,7 @@ log = condor.log ...@@ -75,7 +74,7 @@ log = condor.log
VLASS_DIR = {{data_location}} VLASS_DIR = {{data_location}}
SBIN_PATH = /lustre/aoc/cluster/pipeline/$ENV(CAPO_PROFILE)/workspaces/sbin SBIN_PATH = /lustre/aoc/cluster/pipeline/$ENV(CAPO_PROFILE)/workspaces/sbin
should_transfer_files = yes should_transfer_files = yes
transfer_input_files = $ENV(HOME)/.ssh/condor_transfer, nraorsync://$(SBIN_PATH)/.matplotlib, nraorsync://$(SBIN_PATH)/pycapo, nraorsync://$(SBIN_PATH)/vela, nraorsync://$(SBIN_PATH)/casa_envoy, nraorsync://$(VLASS_DIR)/working, nraorsync://$(VLASS_DIR)/rawdata, nraorsync://$(VLASS_DIR)/products, nraorsync://$(VLASS_DIR)/{{profile}}.properties, nraorsync://$(VLASS_DIR)/PPR.xml, nraorsync://$(VLASS_DIR)/metadata.json transfer_input_files = $ENV(HOME)/.ssh/condor_transfer, nraorsync://$(SBIN_PATH)/.matplotlib, nraorsync://$(SBIN_PATH)/pycapo, nraorsync://$(SBIN_PATH)/vela, nraorsync://$(SBIN_PATH)/casa_envoy, nraorsync://$(VLASS_DIR)/working, nraorsync://$(VLASS_DIR)/rawdata, nraorsync://$(VLASS_DIR)/products{{#remote}}, nraorsync://$(VLASS_DIR)/{{profile}}.properties{{/remote}}, nraorsync://$(VLASS_DIR)/PPR.xml, nraorsync://$(VLASS_DIR)/metadata.json
when_to_transfer_output = ON_EXIT when_to_transfer_output = ON_EXIT
transfer_output_files = .job.ad transfer_output_files = .job.ad
+nrao_output_files = "working products" +nrao_output_files = "working products"
...@@ -84,8 +83,11 @@ output_destination = nraorsync://$(VLASS_DIR) ...@@ -84,8 +83,11 @@ output_destination = nraorsync://$(VLASS_DIR)
request_memory = 80G request_memory = 80G
getenv = True getenv = True
environment = "CAPO_PATH=."
{{^remote}}
environment = "CAPO_PATH=/home/casa/capo"
requirements = HasLustre == True
{{/remote}}
{{#remote}} {{#remote}}
requirements = (VLASS == True) requirements = (VLASS == True)
+partition = "VLASS" +partition = "VLASS"
...@@ -99,6 +101,10 @@ queue ...@@ -99,6 +101,10 @@ queue
envoy_script = """#!/bin/sh envoy_script = """#!/bin/sh
export HOME=$TMPDIR export HOME=$TMPDIR
{{#remote}}
export CAPO_PATH=.
export CAPO_PROFILE={{profile}}
{{/remote}}
set -o errexit set -o errexit
......
...@@ -44,6 +44,7 @@ class ArchiveWorkflows(Enum): ...@@ -44,6 +44,7 @@ class ArchiveWorkflows(Enum):
SECI = "vlass_seci" SECI = "vlass_seci"
CUBE = "vlass_coarse" CUBE = "vlass_coarse"
QUICKLOOK = "vlass_quicklook" QUICKLOOK = "vlass_quicklook"
VLASS_CAL = "vlass_calibration"
INGEST_SECI = "ingest_seci" INGEST_SECI = "ingest_seci"
@classmethod @classmethod
......
...@@ -138,6 +138,16 @@ archive_msg_templates = DictView( ...@@ -138,6 +138,16 @@ archive_msg_templates = DictView(
"message": "quicklook complete", "message": "quicklook complete",
"status": "complete", "status": "complete",
}, },
"vlass_cal_failed": {
"application": "workflow",
"message": "calibration failed",
"status": "failed",
},
"vlass_cal_complete": {
"application": "workflow",
"message": "calibration complete",
"status": "complete",
},
"do_not_calibrate": { "do_not_calibrate": {
"application": "qa-script", "application": "qa-script",
"logData": { "logData": {
......
...@@ -970,6 +970,14 @@ class WorkflowMessageHandler: ...@@ -970,6 +970,14 @@ class WorkflowMessageHandler:
ql_msg = ArchiveMessageArchitect(routing_key=routing_key, request=wf_request).compose_message("ql_failed") ql_msg = ArchiveMessageArchitect(routing_key=routing_key, request=wf_request).compose_message("ql_failed")
self.archive_messenger.send_message(**ql_msg) self.archive_messenger.send_message(**ql_msg)
if wf_request.workflow_name == ArchiveWorkflows.VLASS_CAL.value:
logger.debug(f"Sending failed calibration message to VLASS Manager for request #{wf_id}")
routing_key = f"ws-workflow.calibration.{wf_id}"
cal_msg = ArchiveMessageArchitect(routing_key=routing_key, request=wf_request).compose_message(
"vlass_cal_failed"
)
self.archive_messenger.send_message(**cal_msg)
def send_archive_complete_event(self, **message: Dict): def send_archive_complete_event(self, **message: Dict):
subject = message["subject"] subject = message["subject"]
wf_id = subject["workflow_request_id"] wf_id = subject["workflow_request_id"]
...@@ -997,6 +1005,14 @@ class WorkflowMessageHandler: ...@@ -997,6 +1005,14 @@ class WorkflowMessageHandler:
ql_msg = ArchiveMessageArchitect(routing_key=routing_key, request=wf_request).compose_message("ql_complete") ql_msg = ArchiveMessageArchitect(routing_key=routing_key, request=wf_request).compose_message("ql_complete")
self.archive_messenger.send_message(**ql_msg) self.archive_messenger.send_message(**ql_msg)
if wf_request.workflow_name == ArchiveWorkflows.QUICKLOOK.value:
logger.debug(f"Sending calibration message to VLASS Manager for request #{wf_id}")
routing_key = f"ws-workflow.calibration.{wf_id}"
cal_msg = ArchiveMessageArchitect(routing_key=routing_key, request=wf_request).compose_message(
"vlass_cal_complete"
)
self.archive_messenger.send_message(**cal_msg)
@staticmethod @staticmethod
def clean_remote_workflow(request: WorkflowRequest): def clean_remote_workflow(request: WorkflowRequest):
injector = CapoInjector(request) injector = CapoInjector(request)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment