Skip to content
Snippets Groups Projects
Commit 972bb268 authored by Nathan Bockisch's avatar Nathan Bockisch
Browse files

WS-1013: Create the pims_split Workspaces workflow

parent c7074643
No related branches found
No related tags found
1 merge request!1076WS-1013: Create the pims_split Workspaces workflow
Pipeline #6809 passed
"""add pimscache split workflow
Revision ID: e8e6d54d8444
Revises: 55e5b37d1ccf
Create Date: 2022-09-21 11:19:03.245980
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = 'e8e6d54d8444'
down_revision = '55e5b37d1ccf'
branch_labels = None
depends_on = None
# Main DAG for the workflow
pims_split_dag = """JOB RESTORE restore.condor
{{#splits}}
JOB {{.}} split.condor
VARS {{.}} jobname="$(JOB)" split_dir="$({{.}})"
{{/splits}}
JOB FINISH write_finished_file.condor
PARENT RESTORE CHILD {{#splits}}{{.}} {{/splits}}
PARENT {{#splits}}{{.}} {{/splits}} CHILD FINISH
"""
# Restore job
restore_condor = """executable = restore.sh
arguments = {{product_locator}} {{cal_locator}}
output = restore.out
error = restore.err
log = condor.log
SBIN_PATH = /lustre/aoc/cluster/pipeline/$ENV(CAPO_PROFILE)/workspaces/sbin
SPOOL_DIR = {{spool_dir}}
should_transfer_files = yes
{{^existingRestore}}
transfer_input_files = $ENV(HOME)/.ssh/condor_transfer, nraorsync://$(SBIN_PATH)/pycapo, nraosync://$(SPOOL_DIR)/rawdata, nraosync://$(SPOOL_DIR)/products, nraosync://$(SPOOL_DIR)/working, nraosync://$(SPOOL_DIR)/metadata.json, nraosync://$(SPOOL_DIR)/PPR.xml, nraorsync://$(SBIN_PATH)/productfetcher, nraorsync://$(SBIN_PATH)/casa_envoy, nraorsync://$(SBIN_PATH)/vela
transfer_output_files = .job.ad
+nrao_output_files = "rawdata working products"
output_destination = nraorsync://$(SPOOL_DIR)
+WantIOProxy = True
{{/existingRestore}}
{{#existingRestore}}
transfer_input_files = $ENV(HOME)/.ssh/condor_transfer, nraosync://$(SBIN_PATH)/null
{{/existingRestore}}
getenv = True
environment = "CAPO_PATH=/home/casa/capo"
requirements = HasLustre == True
queue
"""
restore_sh = """#!/bin/sh
set -o errexit
{{^existingRestore}}
cd rawdata/
../productfetcher --product-locator $1 --product-locator $2
cd ../
./casa_envoy --restore -c metadata.json PPR.xml
{{/existingRestore}}
{{#existingRestore}}
./null -g
{{/existingRestore}}
"""
# Workflow run on each split
split_condor = """executable = split.sh
arguments = "$(split_dir)"
SPLIT_DIR = "$(split_dir)"
SBIN_PATH = /lustre/aoc/cluster/pipeline/$ENV(CAPO_PROFILE)/workspaces/sbin
SPOOL_DIR = {{spool_dir}}
PIMS_PATH = /lustre/aoc/cluster/pipeline/vlass_{{vlass_env}}/workspaces/bin
should_transfer_files = yes
transfer_input_files = $ENV(HOME)/.ssh/condor_transfer, nraorsync://$(SBIN_PATH)/casa_envoy, nraorsync://$(SBIN_PATH)/vela, nraorsync://$(SBIN_PATH)/pycapo, nraorsync://$(PIMS_PATH)/pimscache, nraosync://$(SPOOL_DIR)/metadata.json, nraosync://$(SPOOL_DIR)/tiles/$(SPLIT_DIR)
transfer_output_files = .job.ad
output_destination = nraorsync://$(SPOOL_DIR)
+WantIOProxy = True
getenv = True
environment = "CAPO_PATH=/home/casa/capo"
requirements = HasLustre == True
queue
"""
split_sh = """#!/bin/sh
set -o errexit
# Something to link measurement set to rawdata directory
# CASA envoy call?
# Populate the cache area
TILE=$(echo $1 | cut -d '/' -f 1)
PHCENTER=$(echo $1 | cut -d '/' -f 2)
# Get the measurement set path
{{^existingRestore}}
MS={{data_location}}/working/*.ms
{{/existingRestore}}
{{#existingRestore}}
MS={{existingRestore}}
{{/existingRestore}}
# Link it in the split's rawdata
ln -s $MS $1/rawdata/
# Run CASA
./casa_envoy --split metadata.json $1/PPR.xml
# Populate cache
./pimscache cp -c {{vlass_product}} -t $TILE -p $PHCENTER $MS
"""
# Finish file creation
write_finished_file_condor = """write_finished_file.sh
output = write_finished.out
error = write_finished.err
log = condor.log
SBIN_PATH = /lustre/aoc/cluster/pipeline/$ENV(CAPO_PROFILE)/workspaces/sbin
SPOOL_DIR = {{spool_dir}}
should_transfer_files = yes
transfer_input_files = $ENV(HOME)/.ssh/condor_transfer, nraorsync://$(SBIN_PATH)/pycapo
transfer_output_files = .job.ad
+nrao_output_files = "finished"
output_destination = nraorsync://$(SPOOL_DIR)
+WantIOProxy = True
getenv = True
environment = "CAPO_PATH=/home/casa/capo"
requirements = HasLustre == True
queue
"""
write_finished_file_sh = """#!/bin/sh
set -o errexit
cd {{data_location}}
/bin/date > finished
"""
def upgrade():
op.execute(
"""
INSERT INTO workflows (workflow_name) VALUES ('pims_split')
"""
)
op.execute(
f"""
INSERT INTO workflow_templates (filename, content, workflow_name)
VALUES ('pims_split.dag', E'{pims_split_dag}', 'pims_split')
"""
)
op.execute(
f"""
INSERT INTO workflow_templates (filename, content, workflow_name)
VALUES ('restore.condor', E'{restore_condor}', 'pims_split')
"""
)
op.execute(
f"""
INSERT INTO workflow_templates (filename, content, workflow_name)
VALUES ('restore.sh', E'{restore_sh}', 'pims_split')
"""
)
op.execute(
f"""
INSERT INTO workflow_templates (filename, content, workflow_name)
VALUES ('split.condor', E'{split_condor}', 'pims_split')
"""
)
op.execute(
f"""
INSERT INTO workflow_templates (filename, content, workflow_name)
VALUES ('split.sh', E'{split_sh}', 'pims_split')
"""
)
op.execute(
f"""
INSERT INTO workflow_templates (filename, content, workflow_name)
VALUES ('write_finished_file.condor', E'{write_finished_file_condor}', 'pims_split')
"""
)
op.execute(
f"""
INSERT INTO workflow_templates (filename, content, workflow_name)
VALUES ('write_finished_file.sh', E'{write_finished_file_sh}', 'pims_split')
"""
)
pass
def downgrade():
pass
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment