Skip to content
Snippets Groups Projects
Commit a19af00d authored by Daniel Nemergut's avatar Daniel Nemergut
Browse files

Merge branch 'htc10_wf_template_fixes' into '2.8.3-DEVELOPMENT'

Corrected pims_split templates

See merge request !1588
parents 20e36516 cc48621a
No related branches found
No related tags found
2 merge requests!1607merge 2.8.3 to main,!1588Corrected pims_split templates
Pipeline #14680 passed
Showing
with 341 additions and 0 deletions
"""htc10 workflow fixes
Revision ID: d992b379d8a7
Revises: 36591dc3f14d
Create Date: 2024-02-29 12:37:02.423565
"""
from alembic import op
from pathlib import Path
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = 'd992b379d8a7'
down_revision = '36591dc3f14d'
branch_labels = None
depends_on = None
wf_name = 'pims_split'
# Handle the proper encoding from text -> bytea to not lose any characters
def set_wf_content(filename: str) -> bytes:
return (Path.cwd() / "versions" / "templates" / wf_name / filename).read_text().encode()
def upgrade():
conn = op.get_bind()
# Remove quotes around string variables that have already been quoted
conn.execute(
f"""
UPDATE workflow_templates
SET content = %s
WHERE filename = 'split.condor' and workflow_name = E'{wf_name}'
""",
set_wf_content("split_condor_2.8.3.txt"),
)
# Put back in the update_stage calls that disappeared since 2.8.1
conn.execute(
f"""
UPDATE workflow_templates
SET content = %s
WHERE filename = 'restore.sh' and workflow_name = E'{wf_name}'
""",
set_wf_content("restore_sh_2.8.3.txt"),
)
conn.execute(
f"""
UPDATE workflow_templates
SET content = %s
WHERE filename = 'split.sh' and workflow_name = E'{wf_name}'
""",
set_wf_content("split_sh_2.8.3.txt"),
)
conn.execute(
f"""
UPDATE workflow_templates
SET content = %s
WHERE filename = 'write_finished_file.sh' and workflow_name = E'{wf_name}'
""",
set_wf_content("finish_sh_2.8.3.txt"),
)
def downgrade():
conn = op.get_bind()
conn.execute(
f"""
UPDATE workflow_templates
SET content = %s
WHERE filename = 'split.condor' and workflow_name = E'{wf_name}'
""",
set_wf_content("split_condor_2.8.2.txt"),
)
conn.execute(
f"""
UPDATE workflow_templates
SET content = %s
WHERE filename = 'restore.sh' and workflow_name = E'{wf_name}'
""",
set_wf_content("restore_sh_2.8.2.txt"),
)
conn.execute(
f"""
UPDATE workflow_templates
SET content = %s
WHERE filename = 'split.sh' and workflow_name = E'{wf_name}'
""",
set_wf_content("split_sh_2.8.2.txt"),
)
conn.execute(
f"""
UPDATE workflow_templates
SET content = %s
WHERE filename = 'write_finished_file.sh' and workflow_name = E'{wf_name}'
""",
set_wf_content("finish_sh_2.8.2.txt"),
)
#!/bin/sh
cd {{data_location}}
# Set up for emails
ADDRESS_CAPO_PROPERTY="edu.nrao.workspaces.NotificationSettings.vlassAnalystEmail"
ADDRESS=$(/lustre/aoc/cluster/pipeline/$CAPO_PROFILE/workspaces/sbin/pycapo ${ADDRESS_CAPO_PROPERTY} | cut -d '"' -f 2)
NOTIFICATION_CAPO_PROPERTY="edu.nrao.workspaces.NotificationSettings.serviceUrl"
NOTIFICATION_URL=$(/lustre/aoc/cluster/pipeline/$CAPO_PROFILE/workspaces/sbin/pycapo ${NOTIFICATION_CAPO_PROPERTY} | cut -d '"' -f 2)/notify/pims_notification/send
ANALYZER_JSON=$(/lustre/aoc/cluster/pipeline/$CAPO_PROFILE/workspaces/sbin/pims_analyzer --id {{request_id}} 2> analyzer_call.err)
# The analyzer call failed
if [[ $? -ne 0 ]] ; then
FAIL_MESSAGE="Error getting metadata for pims job, check {{data_location}}/analyzer_call.log for more information"
FAIL_SUBJECT="Failure to analyze pims_split for {{vlass_product}}"
FAIL_JSON="{"destination_email": "$ADDRESS", "subject": "$FAIL_SUBJECT", "message": "$FAIL_MESSAGE"}"
FAIL_NOTIFICATION_URL=$(/lustre/aoc/cluster/pipeline/$CAPO_PROFILE/workspaces/sbin/pycapo ${NOTIFICATION_CAPO_PROPERTY} | cut -d '"' -f 2)/email/send
/bin/curl --location --request POST $FAIL_NOTIFICATION_URL --header 'Content-Type: application/json' --data "$FAIL_JSON"
exit 1
fi
# Append address information to the analyzer JSON
JSON="${ANALYZER_JSON%\}}"
JSON+=",\"destination_email\":\"$ADDRESS\"}"
# Send the email
/bin/curl --location --request POST $NOTIFICATION_URL --header 'Content-Type: application/json' --data "$JSON"
/bin/date > finished
#!/bin/sh
SBIN_PATH=/lustre/aoc/cluster/pipeline/$CAPO_PROFILE/workspaces/sbin
${SBIN_PATH}/update_stage FINISH
cd {{data_location}}
# Set up for emails
ADDRESS_CAPO_PROPERTY="edu.nrao.workspaces.NotificationSettings.vlassAnalystEmail"
ADDRESS=$(/lustre/aoc/cluster/pipeline/$CAPO_PROFILE/workspaces/sbin/pycapo ${ADDRESS_CAPO_PROPERTY} | cut -d '"' -f 2)
NOTIFICATION_CAPO_PROPERTY="edu.nrao.workspaces.NotificationSettings.serviceUrl"
NOTIFICATION_URL=$(/lustre/aoc/cluster/pipeline/$CAPO_PROFILE/workspaces/sbin/pycapo ${NOTIFICATION_CAPO_PROPERTY} | cut -d '"' -f 2)/notify/pims_notification/send
ANALYZER_JSON=$(/lustre/aoc/cluster/pipeline/$CAPO_PROFILE/workspaces/sbin/pims_analyzer --id {{request_id}} 2> analyzer_call.err)
# The analyzer call failed
if [[ $? -ne 0 ]] ; then
FAIL_MESSAGE="Error getting metadata for pims job, check {{data_location}}/analyzer_call.log for more information"
FAIL_SUBJECT="Failure to analyze pims_split for {{vlass_product}}"
FAIL_JSON="{"destination_email": "$ADDRESS", "subject": "$FAIL_SUBJECT", "message": "$FAIL_MESSAGE"}"
FAIL_NOTIFICATION_URL=$(/lustre/aoc/cluster/pipeline/$CAPO_PROFILE/workspaces/sbin/pycapo ${NOTIFICATION_CAPO_PROPERTY} | cut -d '"' -f 2)/email/send
/bin/curl --location --request POST $FAIL_NOTIFICATION_URL --header 'Content-Type: application/json' --data "$FAIL_JSON"
exit 1
fi
# Append address information to the analyzer JSON
JSON="${ANALYZER_JSON%\}}"
JSON+=",\"destination_email\":\"$ADDRESS\"}"
# Send the email
/bin/curl --location --request POST $NOTIFICATION_URL --header 'Content-Type: application/json' --data "$JSON"
/bin/date > finished
${SBIN_PATH}/update_stage FINISH --complete
#!/bin/sh
export HOME={{spool_dir}}
SBIN_PATH=/lustre/aoc/cluster/pipeline/$CAPO_PROFILE/workspaces/sbin
cd {{spool_dir}}
{{^existing_restore}}
chmod 770 .
cd rawdata/
$SBIN_PATH/productfetcher --product-locator $1 --product-locator $2
cd ../
$SBIN_PATH/casa_envoy --restore -c metadata.json PPR.xml
{{/existing_restore}}
{{#existing_restore}}
$SBIN_PATH/null -n
{{/existing_restore}}
#!/bin/sh
export HOME={{spool_dir}}
SBIN_PATH=/lustre/aoc/cluster/pipeline/$CAPO_PROFILE/workspaces/sbin
${SBIN_PATH}/update_stage RESTORE
cd {{spool_dir}}
{{^existing_restore}}
chmod 770 .
cd rawdata/
$SBIN_PATH/productfetcher --product-locator $1 --product-locator $2
cd ../
$SBIN_PATH/casa_envoy --restore -c metadata.json PPR.xml
{{/existing_restore}}
{{#existing_restore}}
$SBIN_PATH/null -n
{{/existing_restore}}
${SBIN_PATH}/update_stage RESTORE --complete
executable = split.sh
arguments = "$(split_dir)"{{#quicklook_url}} "$(quicklook_url)"{{/quicklook_url}}
output = tiles/$(split_dir)/split.out
error = tiles/$(split_dir)/split.err
log = condor.log
SPLIT_DIR = "$(split_dir)"
SBIN_PATH = /lustre/aoc/cluster/pipeline/$ENV(CAPO_PROFILE)/workspaces/sbin
SPOOL_DIR = {{spool_dir}}
PIMS_PATH = /lustre/aoc/cluster/pipeline/vlass_{{vlass_env}}/workspaces/bin
should_transfer_files = yes
transfer_input_files = $ENV(HOME)/.ssh/condor_transfer, nraorsync://$(SBIN_PATH)/casa_envoy, nraorsync://$(SBIN_PATH)/vela, nraorsync://$(SBIN_PATH)/pycapo, nraorsync://$(SBIN_PATH)/update_stage, nraorsync://$(PIMS_PATH)/pimscache, nraorsync://$(SPOOL_DIR)/metadata.json, nraorsync://$(SPOOL_DIR)/tiles/$(SPLIT_DIR)/PPR.xml, nraorsync://$(SPOOL_DIR)/tiles/$(SPLIT_DIR)/working, nraorsync://$(SPOOL_DIR)/tiles/$(SPLIT_DIR)/rawdata, nraorsync://$(SPOOL_DIR)/tiles/$(SPLIT_DIR)/products
when_to_transfer_output = ON_EXIT
transfer_output_files = .job.ad
output_destination = nraorsync://$(SPOOL_DIR)
+WantIOProxy = True
getenv = True
environment = "CAPO_PATH=/home/casa/capo"
request_memory = 24G
request_disk = 20G
requirements = (VLASS == True) && (HasLustre == True)
+partition = "VLASS"
queue
executable = split.sh
arguments = $(split_dir){{#quicklook_url}} $(quicklook_url){{/quicklook_url}}
output = tiles/$(split_dir)/split.out
error = tiles/$(split_dir)/split.err
log = condor.log
SPLIT_DIR = $(split_dir)
SBIN_PATH = /lustre/aoc/cluster/pipeline/$ENV(CAPO_PROFILE)/workspaces/sbin
SPOOL_DIR = {{spool_dir}}
PIMS_PATH = /lustre/aoc/cluster/pipeline/vlass_{{vlass_env}}/workspaces/bin
should_transfer_files = yes
transfer_input_files = $ENV(HOME)/.ssh/condor_transfer, nraorsync://$(SBIN_PATH)/casa_envoy, nraorsync://$(SBIN_PATH)/vela, nraorsync://$(SBIN_PATH)/pycapo, nraorsync://$(SBIN_PATH)/update_stage, nraorsync://$(PIMS_PATH)/pimscache, nraorsync://$(SPOOL_DIR)/metadata.json, nraorsync://$(SPOOL_DIR)/tiles/$(SPLIT_DIR)/PPR.xml, nraorsync://$(SPOOL_DIR)/tiles/$(SPLIT_DIR)/working, nraorsync://$(SPOOL_DIR)/tiles/$(SPLIT_DIR)/rawdata, nraorsync://$(SPOOL_DIR)/tiles/$(SPLIT_DIR)/products
when_to_transfer_output = ON_EXIT
transfer_output_files = .job.ad
output_destination = nraorsync://$(SPOOL_DIR)
+WantIOProxy = True
getenv = True
environment = "CAPO_PATH=/home/casa/capo"
request_memory = 24G
request_disk = 20G
requirements = (VLASS == True) && (HasLustre == True)
+partition = "VLASS"
queue
#!/bin/sh
export HOME=$TMPDIR
TILE=$(echo $1 | cut -d "/" -f 1)
PHCENTER=$(echo $1 | cut -d "/" -f 2)
# Get the measurement set path
{{^existing_restore}}
MS={{data_location}}/working/*.ms
{{/existing_restore}}
{{#existing_restore}}
MS={{existing_restore}}
{{/existing_restore}}
# Link it in the splits rawdata
ln -s $MS rawdata/
# failed_splits.txt needs to be present even if its empty for pims_analyzer
touch {{data_location}}/failed_splits.txt
# Run CASA
./casa_envoy --split metadata.json PPR.xml
# Populate cache
./pimscache cp -c {{vlass_product}} -t $TILE -p $PHCENTER working/*_split.ms
# If pimscache call failed, output the failed split to a file for pims_analyzer
if [[ $? -ne 0 ]] ; then
echo "${TILE}.${PHCENTER}" >> {{data_location}}/failed_splits.txt
fi
# Run quicklook if second parameter was given
if ! [[ -z "$2" ]]; then
curl --request PUT --header "Content-Length: 0" $2
fi
#!/bin/sh
export HOME=$TMPDIR
TILE=$(echo $1 | cut -d "/" -f 1)
PHCENTER=$(echo $1 | cut -d "/" -f 2)
STAGE=$1
# Get the measurement set path
{{^existing_restore}}
MS={{data_location}}/working/*.ms
{{/existing_restore}}
{{#existing_restore}}
MS={{existing_restore}}
{{/existing_restore}}
./update_stage SPLIT_${STAGE}
# Link it in the splits rawdata
ln -s $MS rawdata/
# failed_splits.txt needs to be present even if its empty for pims_analyzer
touch {{data_location}}/failed_splits.txt
# Run CASA
./casa_envoy --split metadata.json PPR.xml
# Populate cache
./pimscache cp -c {{vlass_product}} -t $TILE -p $PHCENTER working/*_split.ms
# If pimscache call failed, output the failed split to a file for pims_analyzer
if [[ $? -ne 0 ]] ; then
echo "${TILE}.${PHCENTER}" >> {{data_location}}/failed_splits.txt
fi
# Run quicklook if second parameter was given
if ! [[ -z "$2" ]]; then
curl --request PUT --header "Content-Length: 0" $2
fi
./update_stage SPLIT_${STAGE} --complete
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment