Skip to content
Snippets Groups Projects
Commit 6584ad64 authored by Nathan Bockisch's avatar Nathan Bockisch
Browse files

Updated split workflow with sketches of all stages

parent 5da2bf48
No related branches found
No related tags found
No related merge requests found
......@@ -16,13 +16,12 @@ branch_labels = None
depends_on = None
# Main DAG for the workflow
pims_split_dag = """JOB RESTORE restore_cms_content.condor
VARS RESTORE jobname="$(JOB)"
pims_split_dag = """JOB RESTORE restore_cms.condor
{#phase_centers}
JOB {{center_name}} split.sub
VARS {{center_name}} phase_center={{phase_center}}
{/phase_centers}
{#splits}
JOB {{.}} split_workflow.condor
VARS {{.}} jobname="$(JOB)" split_dir="$({{.}})"
{/splits}
JOB FINISH write_finished_file.condor
......@@ -30,16 +29,69 @@ PARENT RESTORE CHILD SPLIT
PARENT SPLIT CHILD FINISH
"""
# DAG run on each split
split_dag = """JOB SPLITWF split_workflow.condor
# Restore job
restore_cms_condor = """executable = restore_cms.sh
arguments = {{product_locator}} {{cal_locator}} metadata.json PPR.xml
output = restore_cms.out
error = restore_cms.err
log = condor.log
SBIN_PATH = /lustre/aoc/cluster/pipeline/$ENV(CAPO_PROFILE)/workspaces/sbin
SPOOL_DIR = {{spool_dir}}
should_transfer_files = yes
transfer_input_files = $ENV(HOME)/.ssh/condor_transfer, nraorsync://$(SBIN_PATH)/pycapo, nraorsync://$(SBIN_PATH)/productfetcher, nraorsync://$(SBIN_PATH)/casa_envoy
transfer_output_files = .job.ad
+nrao_output_files = "rawdata"
output_destination = nraorsync://$(SPOOL_DIR)
+WantIOProxy = True
getenv = True
environment = "CAPO_PATH=/home/casa/capo"
requirements = HasLustre == True
queue
PARENT SPLITWF
"""
restore_cms_content = """executable = restore_cms.sh
restore_cms_sh = """#!/bin/sh
set -o errexit
cd {{data_location}}
{{^existingRestore}}
cd rawdata/
../productfetcher --product-locator $1 --product-locator $2
cd ../
./casa_envoy --restore -c $3 $4
{{/existingRestore}}
"""
split_workflow_condor = """split_workflow.sh
# Workflow run on each split
split_workflow_condor = """executable = split_workflow.sh
arguments = "$(split_dir)"
SPLIT_DIR = "$(split_dir)"
SBIN_PATH = /lustre/aoc/cluster/pipeline/$ENV(CAPO_PROFILE)/workspaces/sbin
VLASS_DIR = {{data_location}}
SPOOL_DIR = {{spool_dir}}
PIMS_PATH = /lustre/aoc/cluster/pipeline/vlass_{{vlass_env}}/workspaces/bin
should_transfer_files = yes
transfer_input_files = $ENV(HOME)/.ssh/condor_transfer, nraorsync://$(SBIN_PATH)/casa_envoy, nraorsync://$(SBIN_PATH)/pycapo, nraorsync://$(PIMS_PATH)/pimscache, nraosync://$(VLASS_DIR)/metadata.json, nraosync://$(VLASS_DIR)/working nraosync://$(VLASS_DIR)/tiles/$(SPLIT_DIR)
transfer_output_files = .job.ad
output_destination = nraorsync://$(SPOOL_DIR)
+WantIOProxy = True
getenv = True
environment = "CAPO_PATH=/home/casa/capo"
requirements = HasLustre == True
queue
"""
split_workflow_sh = """#!/bin/sh
......@@ -48,6 +100,27 @@ set -o errexit
# Something to link measurement set to rawdata directory
# CASA envoy call?
# Populate the cache area
TILE=$(echo $1 | cut -d '/' -f 1)
PHCENTER=$(echo $1 | cut -d '/' -f 2)
# Get the measurement set path
{{^existingRestore}}
MS={{data_location}}/*.ms
{{/existingRestore}}
{{#existingRestore}}
MS={{existingRestore}}
{{/existingRestore}}
# Link it in the split's rawdata
ln -s $MS $1/rawdata/
# Run CASA
./casa_envoy --split metadata.json $1/PPR.xml
# Populate cache
./pimscache cp -c {{vlass_product}} -t $TILE -p $PHCENTER $MS
"""
# Finish file creation
......@@ -57,11 +130,28 @@ output = write_finished.out
error = write_finished.err
log = condor.log
SBIN_PATH = /lustre/aoc/cluster/pipeline/$ENV(CAPO_PROFILE)/workspaces/sbin
SPOOL_DIR = {{spool_dir}}
should_transfer_files = yes
transfer_input_files = $ENV(HOME)/.ssh/condor_transfer, nraorsync://$(SBIN_PATH)/pycapo
transfer_output_files = .job.ad
+nrao_output_files = "finished"
output_destination = nraorsync://$(SPOOL_DIR)
+WantIOProxy = True
getenv = True
environment = "CAPO_PATH=/home/casa/capo"
requirements = HasLustre == True
queue
"""
write_finished_file_sh = """#!/bin/sh
set -o errexit
cd {{data_location}}
/bin/date > finished
"""
......@@ -79,6 +169,48 @@ def upgrade():
VALUES ('pims_split.dag', E'{pims_split_dag}', 'pims_split')
"""
)
op.execute(
f"""
INSERT INTO workflow_templates (filename, content, workflow_name)
VALUES ('restore_cms.condor', E'{restore_cms_condor}', 'pims_split')
"""
)
op.execute(
f"""
INSERT INTO workflow_templates (filename, content, workflow_name)
VALUES ('restore_cms.sh', E'{restore_cms_sh}', 'pims_split')
"""
)
op.execute(
f"""
INSERT INTO workflow_templates (filename, content, workflow_name)
VALUES ('split_workflow.condor', E'{split_workflow_condor}', 'pims_split')
"""
)
op.execute(
f"""
INSERT INTO workflow_templates (filename, content, workflow_name)
VALUES ('split_workflow.sh', E'{split_workflow_sh}', 'pims_split')
"""
)
op.execute(
f"""
INSERT INTO workflow_templates (filename, content, workflow_name)
VALUES ('write_finished_file.condor', E'{write_finished_file_condor}', 'pims_split')
"""
)
op.execute(
f"""
INSERT INTO workflow_templates (filename, content, workflow_name)
VALUES ('write_finished_file.sh', E'{write_finished_file_sh}', 'pims_split')
"""
)
pass
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment