diff --git a/shared/workspaces/alembic/versions/5dd6938d2307_add_vlass_calibration_templates.py b/shared/workspaces/alembic/versions/5dd6938d2307_add_vlass_calibration_templates.py new file mode 100644 index 0000000000000000000000000000000000000000..5ff5a9475ff05f22376f45178753d14f3470d786 --- /dev/null +++ b/shared/workspaces/alembic/versions/5dd6938d2307_add_vlass_calibration_templates.py @@ -0,0 +1,203 @@ +"""add vlass calibration templates + +Revision ID: 5dd6938d2307 +Revises: 1bf29ab04f27 +Create Date: 2022-09-12 10:01:22.749122 + +""" +from alembic import op + + +# revision identifiers, used by Alembic. +revision = "5dd6938d2307" +down_revision = "1bf29ab04f27" +branch_labels = None +depends_on = None + + +dag = """JOB FETCH vlass_calibration_fetch.condor +VARS FETCH jobname="$(JOB)" + +JOB ENVOY vlass_calibration_envoy.condor +VARS ENVOY jobname="$(JOB)" + +JOB CONVEY vlass_calibration_convey.condor +VARS CONVEY jobname="$(JOB)" + +PARENT FETCH CHILD ENVOY +PARENT ENVOY CHILD CONVEY +""" + +fetch_condor = """executable = vlass_calibration_fetch.sh +arguments = {{product_locator}} + +output = calibration_fetch.out +error = calibration_fetch.err +log = condor.log + +SBIN_PATH = /lustre/aoc/cluster/pipeline/$ENV(CAPO_PROFILE)/workspaces/sbin +SPOOL_DIR = {{data_location}} +should_transfer_files = yes +transfer_input_files = $ENV(HOME)/.ssh/condor_transfer, nraorsync://$(SBIN_PATH)/pycapo, nraorsync://$(SBIN_PATH)/productfetcher, nraorsync://$(SPOOL_DIR)/rawdata +when_to_transfer_output = ON_EXIT +transfer_output_files = .job.ad ++nrao_output_files = "rawdata" +output_destination = nraorsync://$(SPOOL_DIR) ++WantIOProxy = True + +getenv = True +environment = "CAPO_PATH=/home/casa/capo" + +requirements = HasLustre == True + +queue +""" + +fetch_script = """#!/bin/sh +set -o errexit + +export HOME=$TMPDIR + +chmod 770 . +cd rawdata/ +../productfetcher --product-locator $1 +cd ../ +""" + +envoy_condor = """executable = vlass_calibration_envoy.sh +arguments = metadata.json PPR.xml + +output = calibration_envoy.out +error = calibration_envoy.err +log = condor.log + +VLASS_DIR = {{data_location}} +SBIN_PATH = /lustre/aoc/cluster/pipeline/$ENV(CAPO_PROFILE)/workspaces/sbin +should_transfer_files = yes +transfer_input_files = $ENV(HOME)/.ssh/condor_transfer, nraorsync://$(SBIN_PATH)/.matplotlib, nraorsync://$(SBIN_PATH)/pycapo, nraorsync://$(SBIN_PATH)/vela, nraorsync://$(SBIN_PATH)/casa_envoy, nraorsync://$(VLASS_DIR)/working, nraorsync://$(VLASS_DIR)/rawdata, nraorsync://$(VLASS_DIR)/products{{#remote}}, nraorsync://$(VLASS_DIR)/{{profile}}.properties{{/remote}}, nraorsync://$(VLASS_DIR)/PPR.xml, nraorsync://$(VLASS_DIR)/metadata.json +when_to_transfer_output = ON_EXIT +transfer_output_files = .job.ad ++nrao_output_files = "working products" +output_destination = nraorsync://$(VLASS_DIR) ++WantIOProxy = True + +request_memory = 80G +getenv = True + +{{^remote}} +environment = "CAPO_PATH=/home/casa/capo" +requirements = HasLustre == True +{{/remote}} +{{#remote}} +requirements = (VLASS == True) ++partition = "VLASS" +Rank = (TARGET.VLASS == True) + (TARGET.VLASSTEST =!= True) +{{/remote}} + +queue + +""" + +envoy_script = """#!/bin/sh + +export HOME=$TMPDIR +{{#remote}} +export CAPO_PATH=. +export CAPO_PROFILE={{profile}} +{{/remote}} + +set -o errexit + +./casa_envoy --vlass-cal $1 $2 +""" + +convey_condor = """executable = vlass_calibration_convey.sh +arguments = metadata.json + +output = calibration_convey.out +error = calibration_convey.err +log = condor.log + +SBIN_PATH = /lustre/aoc/cluster/pipeline/$ENV(CAPO_PROFILE)/workspaces/sbin +SPOOL_DIR = {{data_location}} +should_transfer_files = yes +transfer_input_files = $ENV(HOME)/.ssh/condor_transfer, nraorsync://$(SBIN_PATH)/pycapo, nraorsync://$(SBIN_PATH)/conveyor, nraorsync://$(SPOOL_DIR)/metadata.json, nraorsync://$(SPOOL_DIR)/working, nraorsync://$(SPOOL_DIR)/rawdata, nraorsync://$(SPOOL_DIR)/products ++WantIOProxy = True + +getenv = True +environment = "CAPO_PATH=/home/casa/capo" +requirements = HasLustre == True + +queue +""" + +convey_script = """#!/bin/sh +set -o errexit + +export HOME=$TMPDIR + +./conveyor --vlass $1 +""" + + +def upgrade(): + op.execute( + """ + INSERT INTO workflows (workflow_name, uses_casa) VALUES ('vlass_calibration', true); + """ + ) + op.execute( + f""" + INSERT INTO workflow_templates (filename, content, workflow_name) + VALUES ('vlass_calibration.dag', E'{dag}', 'vlass_calibration') + """ + ) + op.execute( + f""" + INSERT INTO workflow_templates (filename, content, workflow_name) + VALUES ('vlass_calibration_fetch.condor', E'{fetch_condor}', 'vlass_calibration') + """ + ) + op.execute( + f""" + INSERT INTO workflow_templates (filename, content, workflow_name) + VALUES ('vlass_calibration_fetch.sh', E'{fetch_script}', 'vlass_calibration') + """ + ) + op.execute( + f""" + INSERT INTO workflow_templates (filename, content, workflow_name) + VALUES ('vlass_calibration_envoy.condor', E'{envoy_condor}', 'vlass_calibration') + """ + ) + op.execute( + f""" + INSERT INTO workflow_templates (filename, content, workflow_name) + VALUES ('vlass_calibration_envoy.sh', E'{envoy_script}', 'vlass_calibration') + """ + ) + op.execute( + f""" + INSERT INTO workflow_templates (filename, content, workflow_name) + VALUES ('vlass_calibration_convey.condor', E'{convey_condor}', 'vlass_calibration') + """ + ) + op.execute( + f""" + INSERT INTO workflow_templates (filename, content, workflow_name) + VALUES ('vlass_calibration_convey.sh', E'{convey_script}', 'vlass_calibration') + """ + ) + + +def downgrade(): + op.execute( + """ + DELETE FROM workflow_templates WHERE workflow_name = 'vlass_calibration' + """ + ) + op.execute( + """ + DELETE FROM workflows WHERE workflow_name = 'vlass_calibration' + """ + ) diff --git a/shared/workspaces/workspaces/system/enums.py b/shared/workspaces/workspaces/system/enums.py index bb2e7b175b2456842edebb73424f6413ad6096ba..2cee81fc185659a4578b5f2f9efcbe11bf9832a6 100644 --- a/shared/workspaces/workspaces/system/enums.py +++ b/shared/workspaces/workspaces/system/enums.py @@ -45,6 +45,7 @@ class RemoteWorkflows(Enum): SECI = "vlass_seci" CUBE = "vlass_coarse" QUICKLOOK = "vlass_quicklook" + VLASS_CAL = "vlass_calibration" DOWNLOAD = "test_download" # listed as remote due to ALMA downloads running in CV, but still requires lustre access CARTA = "carta" STD_CALIBRATION = "std_calibration" diff --git a/shared/workspaces/workspaces/workflow/enum.py b/shared/workspaces/workspaces/workflow/enum.py index 02eb85f0cb3db1f6b0bc35deda17e11be033767b..d60b575d6f5c5394aadaf45d11b070c890e2f10a 100644 --- a/shared/workspaces/workspaces/workflow/enum.py +++ b/shared/workspaces/workspaces/workflow/enum.py @@ -44,6 +44,7 @@ class ArchiveWorkflows(Enum): SECI = "vlass_seci" CUBE = "vlass_coarse" QUICKLOOK = "vlass_quicklook" + VLASS_CAL = "vlass_calibration" INGEST_SECI = "ingest_seci" @classmethod diff --git a/shared/workspaces/workspaces/workflow/message_architect.py b/shared/workspaces/workspaces/workflow/message_architect.py index 15ba7334c969599821e23c323d6abf71d14b84c0..289e2a4217ca510b5b6b6c978097893afd78b384 100644 --- a/shared/workspaces/workspaces/workflow/message_architect.py +++ b/shared/workspaces/workspaces/workflow/message_architect.py @@ -138,6 +138,16 @@ archive_msg_templates = DictView( "message": "quicklook complete", "status": "complete", }, + "vlass_cal_failed": { + "application": "workflow", + "message": "calibration failed", + "status": "failed", + }, + "vlass_cal_complete": { + "application": "workflow", + "message": "calibration complete", + "status": "complete", + }, "do_not_calibrate": { "application": "qa-script", "logData": { diff --git a/shared/workspaces/workspaces/workflow/services/workflow_service.py b/shared/workspaces/workspaces/workflow/services/workflow_service.py index b013bee9bb1fb0630991fe2ca75deac8184510e0..4af11ccf7975dca41c0299cd571641dffe70f5cc 100644 --- a/shared/workspaces/workspaces/workflow/services/workflow_service.py +++ b/shared/workspaces/workspaces/workflow/services/workflow_service.py @@ -970,6 +970,14 @@ class WorkflowMessageHandler: ql_msg = ArchiveMessageArchitect(routing_key=routing_key, request=wf_request).compose_message("ql_failed") self.archive_messenger.send_message(**ql_msg) + if wf_request.workflow_name == ArchiveWorkflows.VLASS_CAL.value: + logger.debug(f"Sending failed calibration message to VLASS Manager for request #{wf_id}") + routing_key = f"ws-workflow.calibration.{wf_id}" + cal_msg = ArchiveMessageArchitect(routing_key=routing_key, request=wf_request).compose_message( + "vlass_cal_failed" + ) + self.archive_messenger.send_message(**cal_msg) + def send_archive_complete_event(self, **message: Dict): subject = message["subject"] wf_id = subject["workflow_request_id"] @@ -997,6 +1005,14 @@ class WorkflowMessageHandler: ql_msg = ArchiveMessageArchitect(routing_key=routing_key, request=wf_request).compose_message("ql_complete") self.archive_messenger.send_message(**ql_msg) + if wf_request.workflow_name == ArchiveWorkflows.VLASS_CAL.value: + logger.debug(f"Sending calibration message to VLASS Manager for request #{wf_id}") + routing_key = f"ws-workflow.calibration.{wf_id}" + cal_msg = ArchiveMessageArchitect(routing_key=routing_key, request=wf_request).compose_message( + "vlass_cal_complete" + ) + self.archive_messenger.send_message(**cal_msg) + @staticmethod def clean_remote_workflow(request: WorkflowRequest): injector = CapoInjector(request)