Skip to content
Snippets Groups Projects
Commit b81ce761 authored by Daniel Nemergut's avatar Daniel Nemergut
Browse files

Correcting the pims_split metadata and dag files to iterate over the splits

parent 877ff79f
No related branches found
No related tags found
3 merge requests!1390Catch up with Main,!1308Catchup with main 2.8.1,!1300Correcting the pims_split metadata and dag files to iterate over the splits
Pipeline #9386 canceled
# Copyright (C) 2023 Associated Universities, Inc. Washington DC, USA.
#
# This file is part of NRAO Workspaces.
#
# Workspaces is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Workspaces is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Workspaces. If not, see <https://www.gnu.org/licenses/>.
#
"""pims split quicklook corrections
Revision ID: 762c98a8adf1
Revises: e00812d93608
Create Date: 2023-04-14 12:25:42.235329
"""
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision = "762c98a8adf1"
down_revision = "e00812d93608"
branch_labels = None
depends_on = None
"""
Iterating over {{#splits}} to make the "splits" array which has the format:
"splits": [
{
"split_dir": {{split_dir}},
"quicklook_url": {{quicklook_url}}
},
]
"""
old_metadata = """{"systemId": "{{request_id}}", "fileSetIds": ["{{sdmId}}", "{{calSdmId}}"], "creationTime": "{{created_at}}", "workflowName": "pims_split", "productLocator": "{{product_locator}}", "calProductLocator": "{{cal_locator}}", "destinationDirectory": "{{root_directory}}/{{relative_path}}", "token": "{{token}}", "splits": ["{{split_dir}}", "{{quicklook_url}}"], "casaHome": "{{casaHome}}", "data_location": "{{data_location}}", "vlass_product": "{{vlass_product}}", "existing_restore": "{{existing_restore}}", "need_project_metadata": "{{need_project_metadata}}"}"""
new_metadata = """{"systemId": "{{request_id}}", "fileSetIds": ["{{sdmId}}", "{{calSdmId}}"], "creationTime": "{{created_at}}", "workflowName": "pims_split", "productLocator": "{{product_locator}}", "calProductLocator": "{{cal_locator}}", "destinationDirectory": "{{root_directory}}/{{relative_path}}", "token": "{{token}}", "splits": [{{#splits}}{"split_dir": "{{split_dir}}", "quicklook_url": "{{quicklook_url}}"},{{/splits}}], "casaHome": "{{casaHome}}", "data_location": "{{data_location}}", "vlass_product": "{{vlass_product}}", "existing_restore": "{{existing_restore}}", "need_project_metadata": "{{need_project_metadata}}"}"""
# Conditionalize the quicklook_url argument in the condor file
old_condor_args = 'arguments = "$(split_dir)" "$(quicklook_url)"'
new_condor_args = 'arguments = "$(split_dir)"{{#quicklook_url}} "$(quicklook_url)"{{/quicklook_url}}'
# Add that pesky comma when transferring input files
old_write_finished_file_condor = """executable = write_finished_file.sh
output = write_finished.out
error = write_finished.err
log = condor.log
SBIN_PATH = /lustre/aoc/cluster/pipeline/$ENV(CAPO_PROFILE)/workspaces/sbin
SPOOL_DIR = {{spool_dir}}
should_transfer_files = yes
transfer_input_files = $ENV(HOME)/.ssh/condor_transfer, nraorsync://$(SBIN_PATH)/pycapo nraorsync://$(SBIN_PATH)/pims_analyzer
transfer_output_files = .job.ad
+nrao_output_files = "finished"
output_destination = nraorsync://$(SPOOL_DIR)
+WantIOProxy = True
getenv = True
environment = "CAPO_PATH=/home/casa/capo"
requirements = (VLASS == True) && (HasLustre == True)
+partition = "VLASS"
queue
"""
new_write_finished_file_condor = """executable = write_finished_file.sh
output = write_finished.out
error = write_finished.err
log = condor.log
SBIN_PATH = /lustre/aoc/cluster/pipeline/$ENV(CAPO_PROFILE)/workspaces/sbin
SPOOL_DIR = {{spool_dir}}
should_transfer_files = yes
transfer_input_files = $ENV(HOME)/.ssh/condor_transfer, nraorsync://$(SBIN_PATH)/pycapo, nraorsync://$(SBIN_PATH)/pims_analyzer
transfer_output_files = .job.ad
+nrao_output_files = "finished"
output_destination = nraorsync://$(SPOOL_DIR)
+WantIOProxy = True
getenv = True
environment = "CAPO_PATH=/home/casa/capo"
requirements = (VLASS == True) && (HasLustre == True)
+partition = "VLASS"
queue
"""
old_write_finished_file_sh = b"""#!/bin/sh
cd {{data_location}}
# Set up for emails
ADDRESS_CAPO_PROPERTY="edu.nrao.workspaces.NotificationSettings.vlassAnalystEmail"
ADDRESS=$(./pycapo ${ADDRESS_CAPO_PROPERTY} | cut -d "'" -f 2)
NOTIFICATION_CAPO_PROPERTY="edu.nrao.workspaces.NotificationSettings.serviceUrl"
NOTIFICATION_URL=$(./pycapo ${NOTIFICATION_CAPO_PROPERTY} | cut -d "'" -f 2)/pims_notification/send
ANALYZER_JSON=$(./pims_analyzer --id {{request_id}} 2> analyzer_call.log)
# The analyzer call failed
if [[ $? -ne 0 ]] ; then
FAIL_MESSAGE="Error getting metadata for pims job, check {{data_location}}/analyzer_call.log for more information"
FAIL_SUBJECT="Failure to analyze pims_split for {{vlass_product}}"
FAIL_JSON="{"destination_email": "$ADDRESS", "subject": "$FAIL_SUBJECT", "message": "$FAIL_MESSAGE"}"
FAIL_NOTIFICATION_URL=$(./pycapo ${NOTIFICATION_CAPO_PROPERTY} | cut -d "'" -f 2)/email/send
/bin/curl --location --request POST $FAIL_NOTIFICATION_URL --header 'Content-Type: application/json' --data-raw "$FAIL_JSON"
exit 1
fi
# Append address information to the analyzer JSON
JSON="${ANALYZER_JSON%\\}}"destination_email": "$ADDRESS"}"
# Send the email
/bin/curl --location --request POST $NOTIFICATION_URL --header 'Content-Type: application/json' --data-raw "$JSON"
/bin/date > finished
"""
new_write_finished_file_sh = b"""#!/bin/sh
cd {{data_location}}
# Set up for emails
ADDRESS_CAPO_PROPERTY="edu.nrao.workspaces.NotificationSettings.vlassAnalystEmail"
ADDRESS=$(/lustre/aoc/cluster/pipeline/$CAPO_PROFILE/workspaces/sbin/pycapo ${ADDRESS_CAPO_PROPERTY} | cut -d '"' -f 2)
NOTIFICATION_CAPO_PROPERTY="edu.nrao.workspaces.NotificationSettings.serviceUrl"
NOTIFICATION_URL=$(/lustre/aoc/cluster/pipeline/$CAPO_PROFILE/workspaces/sbin/pycapo ${NOTIFICATION_CAPO_PROPERTY} | cut -d '"' -f 2)/notify/pims_notification/send
ANALYZER_JSON=$(/lustre/aoc/cluster/pipeline/$CAPO_PROFILE/workspaces/sbin/pims_analyzer --id {{request_id}} 2> analyzer_call.err)
# The analyzer call failed
if [[ $? -ne 0 ]] ; then
FAIL_MESSAGE="Error getting metadata for pims job, check {{data_location}}/analyzer_call.log for more information"
FAIL_SUBJECT="Failure to analyze pims_split for {{vlass_product}}"
FAIL_JSON="{"destination_email": "$ADDRESS", "subject": "$FAIL_SUBJECT", "message": "$FAIL_MESSAGE"}"
FAIL_NOTIFICATION_URL=$(/lustre/aoc/cluster/pipeline/$CAPO_PROFILE/workspaces/sbin/pycapo ${NOTIFICATION_CAPO_PROPERTY} | cut -d '"' -f 2)/email/send
/bin/curl --location --request POST $FAIL_NOTIFICATION_URL --header 'Content-Type: application/json' --data "$FAIL_JSON"
exit 1
fi
# Append address information to the analyzer JSON
JSON="${ANALYZER_JSON%\\}}"
JSON+=",\\"destination_email\\":\\"$ADDRESS\\"}"
# Send the email
/bin/curl --location --request POST $NOTIFICATION_URL --header 'Content-Type: application/json' --data "$JSON"
/bin/date > finished
"""
def upgrade():
op.execute(
f"""
UPDATE workflow_templates
SET content = E'{new_metadata}'
WHERE workflow_name = 'pims_split' AND filename = 'metadata.json'
"""
)
op.execute(
f"""
UPDATE workflow_templates
SET content = replace(convert_from(content, 'utf8'), E'{old_condor_args}', E'{new_condor_args}')::bytea
WHERE workflow_name = 'pims_split' AND filename = 'split.condor'
"""
)
op.execute(
f"""
UPDATE workflow_templates
SET content = E'{new_write_finished_file_condor}'
WHERE workflow_name = 'pims_split' AND filename = 'write_finished_file.condor'
"""
)
conn = op.get_bind()
conn.execute(
f"""
UPDATE workflow_templates
SET content = %s WHERE filename='write_finished_file.sh'
""",
new_write_finished_file_sh,
)
def downgrade():
op.execute(
f"""
UPDATE workflow_templates
SET content = E'{old_metadata}'
WHERE workflow_name = 'pims_split' AND filename = 'metadata.json'
"""
)
op.execute(
f"""
UPDATE workflow_templates
SET content = replace(convert_from(content, 'utf8'), E'{new_condor_args}', E'{old_condor_args}')::bytea
WHERE workflow_name = 'pims_split' AND filename = 'split.condor'
"""
)
op.execute(
f"""
UPDATE workflow_templates
SET content = E'{old_write_finished_file_condor}'
WHERE workflow_name = 'pims_split' AND filename = 'write_finished_file.condor'
"""
)
conn = op.get_bind()
conn.execute(
f"""
UPDATE workflow_templates
SET content = %s WHERE filename='write_finished_file.sh'
""",
old_write_finished_file_sh,
)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment