From 972bb268954ef32dd2da76859e8e34ab7f5e1e0e Mon Sep 17 00:00:00 2001 From: Nathan Bockisch <nbockisc@nrao.edu> Date: Mon, 26 Sep 2022 17:37:54 -0400 Subject: [PATCH] WS-1013: Create the pims_split Workspaces workflow --- ...e6d54d8444_add_pimscache_split_workflow.py | 222 ++++++++++++++++++ 1 file changed, 222 insertions(+) create mode 100644 shared/workspaces/alembic/versions/e8e6d54d8444_add_pimscache_split_workflow.py diff --git a/shared/workspaces/alembic/versions/e8e6d54d8444_add_pimscache_split_workflow.py b/shared/workspaces/alembic/versions/e8e6d54d8444_add_pimscache_split_workflow.py new file mode 100644 index 000000000..c622b2567 --- /dev/null +++ b/shared/workspaces/alembic/versions/e8e6d54d8444_add_pimscache_split_workflow.py @@ -0,0 +1,222 @@ +"""add pimscache split workflow + +Revision ID: e8e6d54d8444 +Revises: 55e5b37d1ccf +Create Date: 2022-09-21 11:19:03.245980 + +""" +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision = 'e8e6d54d8444' +down_revision = '55e5b37d1ccf' +branch_labels = None +depends_on = None + +# Main DAG for the workflow +pims_split_dag = """JOB RESTORE restore.condor + +{{#splits}} +JOB {{.}} split.condor +VARS {{.}} jobname="$(JOB)" split_dir="$({{.}})" +{{/splits}} + +JOB FINISH write_finished_file.condor + +PARENT RESTORE CHILD {{#splits}}{{.}} {{/splits}} +PARENT {{#splits}}{{.}} {{/splits}} CHILD FINISH +""" + +# Restore job +restore_condor = """executable = restore.sh +arguments = {{product_locator}} {{cal_locator}} + +output = restore.out +error = restore.err +log = condor.log + +SBIN_PATH = /lustre/aoc/cluster/pipeline/$ENV(CAPO_PROFILE)/workspaces/sbin +SPOOL_DIR = {{spool_dir}} +should_transfer_files = yes +{{^existingRestore}} +transfer_input_files = $ENV(HOME)/.ssh/condor_transfer, nraorsync://$(SBIN_PATH)/pycapo, nraosync://$(SPOOL_DIR)/rawdata, nraosync://$(SPOOL_DIR)/products, nraosync://$(SPOOL_DIR)/working, nraosync://$(SPOOL_DIR)/metadata.json, nraosync://$(SPOOL_DIR)/PPR.xml, nraorsync://$(SBIN_PATH)/productfetcher, nraorsync://$(SBIN_PATH)/casa_envoy, nraorsync://$(SBIN_PATH)/vela + +transfer_output_files = .job.ad ++nrao_output_files = "rawdata working products" +output_destination = nraorsync://$(SPOOL_DIR) ++WantIOProxy = True +{{/existingRestore}} +{{#existingRestore}} +transfer_input_files = $ENV(HOME)/.ssh/condor_transfer, nraosync://$(SBIN_PATH)/null +{{/existingRestore}} + +getenv = True +environment = "CAPO_PATH=/home/casa/capo" + +requirements = HasLustre == True + +queue + +""" + +restore_sh = """#!/bin/sh +set -o errexit + +{{^existingRestore}} +cd rawdata/ +../productfetcher --product-locator $1 --product-locator $2 +cd ../ +./casa_envoy --restore -c metadata.json PPR.xml +{{/existingRestore}} +{{#existingRestore}} +./null -g +{{/existingRestore}} +""" + +# Workflow run on each split +split_condor = """executable = split.sh +arguments = "$(split_dir)" + +SPLIT_DIR = "$(split_dir)" +SBIN_PATH = /lustre/aoc/cluster/pipeline/$ENV(CAPO_PROFILE)/workspaces/sbin +SPOOL_DIR = {{spool_dir}} +PIMS_PATH = /lustre/aoc/cluster/pipeline/vlass_{{vlass_env}}/workspaces/bin +should_transfer_files = yes +transfer_input_files = $ENV(HOME)/.ssh/condor_transfer, nraorsync://$(SBIN_PATH)/casa_envoy, nraorsync://$(SBIN_PATH)/vela, nraorsync://$(SBIN_PATH)/pycapo, nraorsync://$(PIMS_PATH)/pimscache, nraosync://$(SPOOL_DIR)/metadata.json, nraosync://$(SPOOL_DIR)/tiles/$(SPLIT_DIR) +transfer_output_files = .job.ad +output_destination = nraorsync://$(SPOOL_DIR) ++WantIOProxy = True + +getenv = True +environment = "CAPO_PATH=/home/casa/capo" + +requirements = HasLustre == True + +queue + +""" + +split_sh = """#!/bin/sh +set -o errexit + +# Something to link measurement set to rawdata directory +# CASA envoy call? +# Populate the cache area + +TILE=$(echo $1 | cut -d '/' -f 1) +PHCENTER=$(echo $1 | cut -d '/' -f 2) + +# Get the measurement set path +{{^existingRestore}} +MS={{data_location}}/working/*.ms +{{/existingRestore}} +{{#existingRestore}} +MS={{existingRestore}} +{{/existingRestore}} + +# Link it in the split's rawdata +ln -s $MS $1/rawdata/ + +# Run CASA +./casa_envoy --split metadata.json $1/PPR.xml + +# Populate cache +./pimscache cp -c {{vlass_product}} -t $TILE -p $PHCENTER $MS + +""" + +# Finish file creation +write_finished_file_condor = """write_finished_file.sh + +output = write_finished.out +error = write_finished.err +log = condor.log + +SBIN_PATH = /lustre/aoc/cluster/pipeline/$ENV(CAPO_PROFILE)/workspaces/sbin +SPOOL_DIR = {{spool_dir}} +should_transfer_files = yes +transfer_input_files = $ENV(HOME)/.ssh/condor_transfer, nraorsync://$(SBIN_PATH)/pycapo +transfer_output_files = .job.ad ++nrao_output_files = "finished" +output_destination = nraorsync://$(SPOOL_DIR) ++WantIOProxy = True + +getenv = True +environment = "CAPO_PATH=/home/casa/capo" + +requirements = HasLustre == True + +queue + +""" + +write_finished_file_sh = """#!/bin/sh +set -o errexit + +cd {{data_location}} +/bin/date > finished + +""" + +def upgrade(): + op.execute( + """ + INSERT INTO workflows (workflow_name) VALUES ('pims_split') + """ + ) + + op.execute( + f""" + INSERT INTO workflow_templates (filename, content, workflow_name) + VALUES ('pims_split.dag', E'{pims_split_dag}', 'pims_split') + """ + ) + + op.execute( + f""" + INSERT INTO workflow_templates (filename, content, workflow_name) + VALUES ('restore.condor', E'{restore_condor}', 'pims_split') + """ + ) + + op.execute( + f""" + INSERT INTO workflow_templates (filename, content, workflow_name) + VALUES ('restore.sh', E'{restore_sh}', 'pims_split') + """ + ) + + op.execute( + f""" + INSERT INTO workflow_templates (filename, content, workflow_name) + VALUES ('split.condor', E'{split_condor}', 'pims_split') + """ + ) + + op.execute( + f""" + INSERT INTO workflow_templates (filename, content, workflow_name) + VALUES ('split.sh', E'{split_sh}', 'pims_split') + """ + ) + + op.execute( + f""" + INSERT INTO workflow_templates (filename, content, workflow_name) + VALUES ('write_finished_file.condor', E'{write_finished_file_condor}', 'pims_split') + """ + ) + + op.execute( + f""" + INSERT INTO workflow_templates (filename, content, workflow_name) + VALUES ('write_finished_file.sh', E'{write_finished_file_sh}', 'pims_split') + """ + ) + pass + + +def downgrade(): + pass -- GitLab