Skip to content
Snippets Groups Projects

Correcting the pims_split metadata and dag files to iterate over the splits

Merged Daniel Nemergut requested to merge pims_split_template_fixes into main
@@ -102,6 +102,75 @@ queue
"""
old_write_finished_file_sh = b"""#!/bin/sh
cd {{data_location}}
# Set up for emails
ADDRESS_CAPO_PROPERTY="edu.nrao.workspaces.NotificationSettings.vlassAnalystEmail"
ADDRESS=$(./pycapo ${ADDRESS_CAPO_PROPERTY} | cut -d "'" -f 2)
NOTIFICATION_CAPO_PROPERTY="edu.nrao.workspaces.NotificationSettings.serviceUrl"
NOTIFICATION_URL=$(./pycapo ${NOTIFICATION_CAPO_PROPERTY} | cut -d "'" -f 2)/pims_notification/send
ANALYZER_JSON=$(./pims_analyzer --id {{request_id}} 2> analyzer_call.log)
# The analyzer call failed
if [[ $? -ne 0 ]] ; then
FAIL_MESSAGE="Error getting metadata for pims job, check {{data_location}}/analyzer_call.log for more information"
FAIL_SUBJECT="Failure to analyze pims_split for {{vlass_product}}"
FAIL_JSON="{"destination_email": "$ADDRESS", "subject": "$FAIL_SUBJECT", "message": "$FAIL_MESSAGE"}"
FAIL_NOTIFICATION_URL=$(./pycapo ${NOTIFICATION_CAPO_PROPERTY} | cut -d "'" -f 2)/email/send
/bin/curl --location --request POST $FAIL_NOTIFICATION_URL --header 'Content-Type: application/json' --data-raw "$FAIL_JSON"
exit 1
fi
# Append address information to the analyzer JSON
JSON="${ANALYZER_JSON%\\}}"destination_email": "$ADDRESS"}"
# Send the email
/bin/curl --location --request POST $NOTIFICATION_URL --header 'Content-Type: application/json' --data-raw "$JSON"
/bin/date > finished
"""
new_write_finished_file_sh = b"""#!/bin/sh
cd {{data_location}}
# Set up for emails
ADDRESS_CAPO_PROPERTY="edu.nrao.workspaces.NotificationSettings.vlassAnalystEmail"
ADDRESS=$(/lustre/aoc/cluster/pipeline/$CAPO_PROFILE/workspaces/sbin/pycapo ${ADDRESS_CAPO_PROPERTY} | cut -d '"' -f 2)
NOTIFICATION_CAPO_PROPERTY="edu.nrao.workspaces.NotificationSettings.serviceUrl"
NOTIFICATION_URL=$(/lustre/aoc/cluster/pipeline/$CAPO_PROFILE/workspaces/sbin/pycapo ${NOTIFICATION_CAPO_PROPERTY} | cut -d '"' -f 2)/notify/pims_notification/send
ANALYZER_JSON=$(/lustre/aoc/cluster/pipeline/$CAPO_PROFILE/workspaces/sbin/pims_analyzer --id {{request_id}} 2> analyzer_call.err)
# The analyzer call failed
if [[ $? -ne 0 ]] ; then
FAIL_MESSAGE="Error getting metadata for pims job, check {{data_location}}/analyzer_call.log for more information"
FAIL_SUBJECT="Failure to analyze pims_split for {{vlass_product}}"
FAIL_JSON="{"destination_email": "$ADDRESS", "subject": "$FAIL_SUBJECT", "message": "$FAIL_MESSAGE"}"
FAIL_NOTIFICATION_URL=$(/lustre/aoc/cluster/pipeline/$CAPO_PROFILE/workspaces/sbin/pycapo ${NOTIFICATION_CAPO_PROPERTY} | cut -d '"' -f 2)/email/send
/bin/curl --location --request POST $FAIL_NOTIFICATION_URL --header 'Content-Type: application/json' --data "$FAIL_JSON"
exit 1
fi
# Append address information to the analyzer JSON
JSON="${ANALYZER_JSON%\\}}"
JSON+=",\\"destination_email\\":\\"$ADDRESS\\"}"
# Send the email
/bin/curl --location --request POST $NOTIFICATION_URL --header 'Content-Type: application/json' --data "$JSON"
/bin/date > finished
"""
def upgrade():
op.execute(
@@ -117,7 +186,8 @@ def upgrade():
UPDATE workflow_templates
SET content = replace(convert_from(content, 'utf8'), E'{old_condor_args}', E'{new_condor_args}')::bytea
WHERE workflow_name = 'pims_split' AND filename = 'split.condor'
""")
"""
)
op.execute(
f"""
@@ -127,6 +197,15 @@ def upgrade():
"""
)
conn = op.get_bind()
conn.execute(
f"""
UPDATE workflow_templates
SET content = %s WHERE filename='write_finished_file.sh'
""",
new_write_finished_file_sh,
)
def downgrade():
op.execute(
@@ -142,7 +221,8 @@ def downgrade():
UPDATE workflow_templates
SET content = replace(convert_from(content, 'utf8'), E'{new_condor_args}', E'{old_condor_args}')::bytea
WHERE workflow_name = 'pims_split' AND filename = 'split.condor'
""")
"""
)
op.execute(
f"""
@@ -151,3 +231,12 @@ def downgrade():
WHERE workflow_name = 'pims_split' AND filename = 'write_finished_file.condor'
"""
)
conn = op.get_bind()
conn.execute(
f"""
UPDATE workflow_templates
SET content = %s WHERE filename='write_finished_file.sh'
""",
old_write_finished_file_sh,
)
Loading