Skip to content
Snippets Groups Projects

WS-1013: Create the pims_split Workspaces workflow

Merged Nathan Bockisch requested to merge pims-split-foo into main
"""add pimscache split workflow
Revision ID: e8e6d54d8444
Revises: 55e5b37d1ccf
Create Date: 2022-09-21 11:19:03.245980
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = 'e8e6d54d8444'
down_revision = '55e5b37d1ccf'
branch_labels = None
depends_on = None
# Main DAG for the workflow
pims_split_dag = """JOB RESTORE restore_cms.condor
{#splits}
JOB {{.}} split_workflow.condor
VARS {{.}} jobname="$(JOB)" split_dir="$({{.}})"
{/splits}
JOB FINISH write_finished_file.condor
PARENT RESTORE CHILD SPLIT
PARENT SPLIT CHILD FINISH
"""
# Restore job
restore_cms_condor = """executable = restore_cms.sh
arguments = {{product_locator}} {{cal_locator}} metadata.json PPR.xml
output = restore_cms.out
error = restore_cms.err
log = condor.log
SBIN_PATH = /lustre/aoc/cluster/pipeline/$ENV(CAPO_PROFILE)/workspaces/sbin
SPOOL_DIR = {{spool_dir}}
should_transfer_files = yes
transfer_input_files = $ENV(HOME)/.ssh/condor_transfer, nraorsync://$(SBIN_PATH)/pycapo, nraorsync://$(SBIN_PATH)/productfetcher, nraorsync://$(SBIN_PATH)/casa_envoy
transfer_output_files = .job.ad
+nrao_output_files = "rawdata"
output_destination = nraorsync://$(SPOOL_DIR)
+WantIOProxy = True
getenv = True
environment = "CAPO_PATH=/home/casa/capo"
requirements = HasLustre == True
queue
"""
restore_cms_sh = """#!/bin/sh
set -o errexit
cd {{data_location}}
{{^existingRestore}}
cd rawdata/
../productfetcher --product-locator $1 --product-locator $2
cd ../
./casa_envoy --restore -c $3 $4
{{/existingRestore}}
"""
# Workflow run on each split
split_workflow_condor = """executable = split_workflow.sh
arguments = "$(split_dir)"
SPLIT_DIR = "$(split_dir)"
SBIN_PATH = /lustre/aoc/cluster/pipeline/$ENV(CAPO_PROFILE)/workspaces/sbin
VLASS_DIR = {{data_location}}
SPOOL_DIR = {{spool_dir}}
PIMS_PATH = /lustre/aoc/cluster/pipeline/vlass_{{vlass_env}}/workspaces/bin
should_transfer_files = yes
transfer_input_files = $ENV(HOME)/.ssh/condor_transfer, nraorsync://$(SBIN_PATH)/casa_envoy, nraorsync://$(SBIN_PATH)/pycapo, nraorsync://$(PIMS_PATH)/pimscache, nraosync://$(VLASS_DIR)/metadata.json, nraosync://$(VLASS_DIR)/working nraosync://$(VLASS_DIR)/tiles/$(SPLIT_DIR)
transfer_output_files = .job.ad
output_destination = nraorsync://$(SPOOL_DIR)
+WantIOProxy = True
getenv = True
environment = "CAPO_PATH=/home/casa/capo"
requirements = HasLustre == True
queue
"""
split_workflow_sh = """#!/bin/sh
set -o errexit
# Something to link measurement set to rawdata directory
# CASA envoy call?
# Populate the cache area
TILE=$(echo $1 | cut -d '/' -f 1)
PHCENTER=$(echo $1 | cut -d '/' -f 2)
# Get the measurement set path
{{^existingRestore}}
MS={{data_location}}/*.ms
{{/existingRestore}}
{{#existingRestore}}
MS={{existingRestore}}
{{/existingRestore}}
# Link it in the split's rawdata
ln -s $MS $1/rawdata/
# Run CASA
./casa_envoy --split metadata.json $1/PPR.xml
# Populate cache
./pimscache cp -c {{vlass_product}} -t $TILE -p $PHCENTER $MS
"""
# Finish file creation
write_finished_file_condor = """write_finished_file.sh
output = write_finished.out
error = write_finished.err
log = condor.log
SBIN_PATH = /lustre/aoc/cluster/pipeline/$ENV(CAPO_PROFILE)/workspaces/sbin
SPOOL_DIR = {{spool_dir}}
should_transfer_files = yes
transfer_input_files = $ENV(HOME)/.ssh/condor_transfer, nraorsync://$(SBIN_PATH)/pycapo
transfer_output_files = .job.ad
+nrao_output_files = "finished"
output_destination = nraorsync://$(SPOOL_DIR)
+WantIOProxy = True
getenv = True
environment = "CAPO_PATH=/home/casa/capo"
requirements = HasLustre == True
queue
"""
write_finished_file_sh = """#!/bin/sh
set -o errexit
cd {{data_location}}
/bin/date > finished
"""
def upgrade():
op.execute(
"""
INSERT INTO workflows (workflow_name) VALUES ('pims_split')
"""
)
op.execute(
f"""
INSERT INTO workflow_templates (filename, content, workflow_name)
VALUES ('pims_split.dag', E'{pims_split_dag}', 'pims_split')
"""
)
op.execute(
f"""
INSERT INTO workflow_templates (filename, content, workflow_name)
VALUES ('restore_cms.condor', E'{restore_cms_condor}', 'pims_split')
"""
)
op.execute(
f"""
INSERT INTO workflow_templates (filename, content, workflow_name)
VALUES ('restore_cms.sh', E'{restore_cms_sh}', 'pims_split')
"""
)
op.execute(
f"""
INSERT INTO workflow_templates (filename, content, workflow_name)
VALUES ('split_workflow.condor', E'{split_workflow_condor}', 'pims_split')
"""
)
op.execute(
f"""
INSERT INTO workflow_templates (filename, content, workflow_name)
VALUES ('split_workflow.sh', E'{split_workflow_sh}', 'pims_split')
"""
)
op.execute(
f"""
INSERT INTO workflow_templates (filename, content, workflow_name)
VALUES ('write_finished_file.condor', E'{write_finished_file_condor}', 'pims_split')
"""
)
op.execute(
f"""
INSERT INTO workflow_templates (filename, content, workflow_name)
VALUES ('write_finished_file.sh', E'{write_finished_file_sh}', 'pims_split')
"""
)
pass
def downgrade():
pass
Loading