diff --git a/shared/workspaces/alembic/versions/491102d56809_add_ingest_obs_workflow.py b/shared/workspaces/alembic/versions/491102d56809_add_ingest_obs_workflow.py index 030e921c8b1f9a7a61e44d2e9900268cab0cfe69..e52410afdb132611d6caca99ce87d9c1621e85e4 100644 --- a/shared/workspaces/alembic/versions/491102d56809_add_ingest_obs_workflow.py +++ b/shared/workspaces/alembic/versions/491102d56809_add_ingest_obs_workflow.py @@ -62,7 +62,7 @@ metadata_json = """{ def upgrade(): op.execute( f""" - INSERT INTO workflows (workflow_name) VALUES (E'{wf_name}') + INSERT INTO workflows (workflow_name, requires_lustre) VALUES (E'{wf_name}', true) """ ) diff --git a/shared/workspaces/alembic/versions/61cbcd1d83f7_mark4_import_workflow.py b/shared/workspaces/alembic/versions/61cbcd1d83f7_mark4_import_workflow.py new file mode 100644 index 0000000000000000000000000000000000000000..38c6b9489a395104ee36a4c027853e6284829470 --- /dev/null +++ b/shared/workspaces/alembic/versions/61cbcd1d83f7_mark4_import_workflow.py @@ -0,0 +1,83 @@ +"""mark4_import workflow + +Revision ID: 61cbcd1d83f7 +Revises: 91091612b6d0 +Create Date: 2023-09-27 09:46:03.860605 + +""" +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision = '61cbcd1d83f7' +down_revision = '91091612b6d0' +branch_labels = None +depends_on = None + +wf_name = "mk_four_import" + +mk_four_import_condor = """executable = mk_four_import.sh +arguments = {{data_src}} + +output = import.out +error = import.err +log = condor.log + +BIN_PATH = $ENV(HOME)/workflows/$ENV(CAPO_PROFILE)/bin +SBIN_PATH = /lustre/aoc/cluster/pipeline/$ENV(CAPO_PROFILE)/workspaces/sbin +SPOOL_DIR = {{spool_dir}} +should_transfer_files = yes +transfer_input_files = $ENV(HOME)/.ssh/condor_transfer, nraorsync://$(SBIN_PATH)/pycapo, $(BIN_PATH)/mark4_import ++WantIOProxy = True + +request_memory = {{ramInGb}} +getenv = True +environment = "CAPO_PATH=/home/casa/capo" + +queue + +""" + +mk_four_import_sh = """#!/bin/sh +set -o errexit + +./mark4_import + +""" + + +def upgrade(): + op.execute( + f""" + INSERT INTO workflows (workflow_name) VALUES (E'{wf_name}') + """ + ) + + op.execute( + f""" + INSERT INTO workflow_templates (filename, content, workflow_name) + VALUES ('mk_four_import.condor', E'{mk_four_import_condor}', E'{wf_name}') + """ + ) + + op.execute( + f""" + INSERT INTO workflow_templates (filename, content, workflow_name) + VALUES ('mk_four_import.sh', E'{mk_four_import_sh}', E'{wf_name}') + """ + ) + + +def downgrade(): + op.execute( + f""" + DELETE FROM workflow_templates WHERE workflow_name = E'{wf_name}' + """ + ) + + op.execute( + f""" + DELETE FROM workflows WHERE workflow_name = E'{wf_name}' + """ + ) diff --git a/shared/workspaces/alembic/versions/91091612b6d0_mark4_workflow_renaming.py b/shared/workspaces/alembic/versions/91091612b6d0_mark4_workflow_renaming.py new file mode 100644 index 0000000000000000000000000000000000000000..df464424fdba9bb6229f8c4f4e76e315bc2957fc --- /dev/null +++ b/shared/workspaces/alembic/versions/91091612b6d0_mark4_workflow_renaming.py @@ -0,0 +1,107 @@ +"""mark4 workflow renaming + +Revision ID: 91091612b6d0 +Revises: d20ceed949b3 +Create Date: 2023-09-22 13:34:54.054114 + +""" +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision = '91091612b6d0' +down_revision = 'd20ceed949b3' +branch_labels = None +depends_on = None + + +old_ingest_mk_four_obs_sh = """#!/bin/sh +set -o errexit + +# Get NGAS hosts and set up variables to randomly select one +NGASHOSTSTR=$(./pycapo -q archive-ingestion.NGASHosts) +NGASHOSTARR=(`/bin/echo ${NGASHOSTSTR}`) # Put the space-delimited host list into an array +NGASHOSTLEN=${#NGASHOSTARR[@]} + +# Copy from the difx area to the Workspaces staging area +WSSTAGINGDIR=$(./pycapo -q edu.nrao.workspaces.IngestionSettings.stagingDirectory) +/bin/cp -r $1 $WSSTAGINGDIR + +OBSDIR=$(/bin/basename $1) +/bin/chmod -R 750 $WSSTAGINGDIR/$OBSDIR # Make sure NGAS has permissions to ingest the files +cd $WSSTAGINGDIR/$OBSDIR + +for FILE in *; do + # Pick random NGAS host to distribute the ingestion load + NGASINDEX=$(($RANDOM % $NGASHOSTLEN)) + NGASHOST=${NGASHOSTARR[$NGASINDEX]} + + FULLPATH=$(/bin/readlink -f $FILE) + NGASCMD="${NGASHOST}ARCHIVE?filename=file://${FULLPATH}" + + INGESTOUT=$(/bin/curl $NGASCMD) + if echo $INGESTOUT | grep -i "error"; then + echo "Failed to ingest ${FILE}" + exit 1 + fi +done""" + +# Workflow will rename the directory to indicate the ingestion status +new_ingest_mk_four_obs_sh = """#!/bin/sh +set -o errexit + +ingested=$1.ingested +failed=$1.failed + +# Get NGAS hosts and set up variables to randomly select one +NGASHOSTSTR=$(./pycapo -q archive-ingestion.NGASHosts) +NGASHOSTARR=(`/bin/echo ${NGASHOSTSTR}`) # Put the space-delimited host list into an array +NGASHOSTLEN=${#NGASHOSTARR[@]} + +# Copy from the difx area to the Workspaces staging area +WSSTAGINGDIR=$(./pycapo -q edu.nrao.workspaces.IngestionSettings.stagingDirectory) +/bin/cp -r $1 $WSSTAGINGDIR + +OBSDIR=$(/bin/basename $1) +/bin/chmod -R 750 $WSSTAGINGDIR/$OBSDIR # Make sure NGAS has permissions to ingest the files +cd $WSSTAGINGDIR/$OBSDIR + +for FILE in *; do + # Pick random NGAS host to distribute the ingestion load + NGASINDEX=$(($RANDOM % $NGASHOSTLEN)) + NGASHOST=${NGASHOSTARR[$NGASINDEX]} + + FULLPATH=$(/bin/readlink -f $FILE) + NGASCMD="${NGASHOST}ARCHIVE?filename=file://${FULLPATH}" + + INGESTOUT=$(/bin/curl $NGASCMD) + if echo $INGESTOUT | grep -i "error"; then + echo "Failed to ingest ${FILE}" + mv $1 $failed + exit 1 + fi +done + +mv $1 $ingested +""" + + +def upgrade(): + op.execute( + f""" + UPDATE workflow_templates + SET content=E'{new_ingest_mk_four_obs_sh}' + WHERE filename='ingest_mk_four_obs.sh' + """ + ) + + +def downgrade(): + op.execute( + f""" + UPDATE workflow_templates + SET content=E'{old_ingest_mk_four_obs_sh}' + WHERE filename='ingest_mk_four_obs.sh' + """ + )