Skip to content
Snippets Groups Projects
Commit 4c85488d authored by Daniel Nemergut's avatar Daniel Nemergut
Browse files

Merge branch 'ws1966-mark4_import_workflow' into 'main'

WS-1966 Mark4 import workflow

See merge request !1455
parents 264ec602 43c0c932
No related branches found
No related tags found
1 merge request!1455WS-1966 Mark4 import workflow
Pipeline #11866 passed
......@@ -62,7 +62,7 @@ metadata_json = """{
def upgrade():
op.execute(
f"""
INSERT INTO workflows (workflow_name) VALUES (E'{wf_name}')
INSERT INTO workflows (workflow_name, requires_lustre) VALUES (E'{wf_name}', true)
"""
)
......
"""mark4_import workflow
Revision ID: 61cbcd1d83f7
Revises: 91091612b6d0
Create Date: 2023-09-27 09:46:03.860605
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '61cbcd1d83f7'
down_revision = '91091612b6d0'
branch_labels = None
depends_on = None
wf_name = "mk_four_import"
mk_four_import_condor = """executable = mk_four_import.sh
arguments = {{data_src}}
output = import.out
error = import.err
log = condor.log
BIN_PATH = $ENV(HOME)/workflows/$ENV(CAPO_PROFILE)/bin
SBIN_PATH = /lustre/aoc/cluster/pipeline/$ENV(CAPO_PROFILE)/workspaces/sbin
SPOOL_DIR = {{spool_dir}}
should_transfer_files = yes
transfer_input_files = $ENV(HOME)/.ssh/condor_transfer, nraorsync://$(SBIN_PATH)/pycapo, $(BIN_PATH)/mark4_import
+WantIOProxy = True
request_memory = {{ramInGb}}
getenv = True
environment = "CAPO_PATH=/home/casa/capo"
queue
"""
mk_four_import_sh = """#!/bin/sh
set -o errexit
./mark4_import
"""
def upgrade():
op.execute(
f"""
INSERT INTO workflows (workflow_name) VALUES (E'{wf_name}')
"""
)
op.execute(
f"""
INSERT INTO workflow_templates (filename, content, workflow_name)
VALUES ('mk_four_import.condor', E'{mk_four_import_condor}', E'{wf_name}')
"""
)
op.execute(
f"""
INSERT INTO workflow_templates (filename, content, workflow_name)
VALUES ('mk_four_import.sh', E'{mk_four_import_sh}', E'{wf_name}')
"""
)
def downgrade():
op.execute(
f"""
DELETE FROM workflow_templates WHERE workflow_name = E'{wf_name}'
"""
)
op.execute(
f"""
DELETE FROM workflows WHERE workflow_name = E'{wf_name}'
"""
)
"""mark4 workflow renaming
Revision ID: 91091612b6d0
Revises: d20ceed949b3
Create Date: 2023-09-22 13:34:54.054114
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '91091612b6d0'
down_revision = 'd20ceed949b3'
branch_labels = None
depends_on = None
old_ingest_mk_four_obs_sh = """#!/bin/sh
set -o errexit
# Get NGAS hosts and set up variables to randomly select one
NGASHOSTSTR=$(./pycapo -q archive-ingestion.NGASHosts)
NGASHOSTARR=(`/bin/echo ${NGASHOSTSTR}`) # Put the space-delimited host list into an array
NGASHOSTLEN=${#NGASHOSTARR[@]}
# Copy from the difx area to the Workspaces staging area
WSSTAGINGDIR=$(./pycapo -q edu.nrao.workspaces.IngestionSettings.stagingDirectory)
/bin/cp -r $1 $WSSTAGINGDIR
OBSDIR=$(/bin/basename $1)
/bin/chmod -R 750 $WSSTAGINGDIR/$OBSDIR # Make sure NGAS has permissions to ingest the files
cd $WSSTAGINGDIR/$OBSDIR
for FILE in *; do
# Pick random NGAS host to distribute the ingestion load
NGASINDEX=$(($RANDOM % $NGASHOSTLEN))
NGASHOST=${NGASHOSTARR[$NGASINDEX]}
FULLPATH=$(/bin/readlink -f $FILE)
NGASCMD="${NGASHOST}ARCHIVE?filename=file://${FULLPATH}"
INGESTOUT=$(/bin/curl $NGASCMD)
if echo $INGESTOUT | grep -i "error"; then
echo "Failed to ingest ${FILE}"
exit 1
fi
done"""
# Workflow will rename the directory to indicate the ingestion status
new_ingest_mk_four_obs_sh = """#!/bin/sh
set -o errexit
ingested=$1.ingested
failed=$1.failed
# Get NGAS hosts and set up variables to randomly select one
NGASHOSTSTR=$(./pycapo -q archive-ingestion.NGASHosts)
NGASHOSTARR=(`/bin/echo ${NGASHOSTSTR}`) # Put the space-delimited host list into an array
NGASHOSTLEN=${#NGASHOSTARR[@]}
# Copy from the difx area to the Workspaces staging area
WSSTAGINGDIR=$(./pycapo -q edu.nrao.workspaces.IngestionSettings.stagingDirectory)
/bin/cp -r $1 $WSSTAGINGDIR
OBSDIR=$(/bin/basename $1)
/bin/chmod -R 750 $WSSTAGINGDIR/$OBSDIR # Make sure NGAS has permissions to ingest the files
cd $WSSTAGINGDIR/$OBSDIR
for FILE in *; do
# Pick random NGAS host to distribute the ingestion load
NGASINDEX=$(($RANDOM % $NGASHOSTLEN))
NGASHOST=${NGASHOSTARR[$NGASINDEX]}
FULLPATH=$(/bin/readlink -f $FILE)
NGASCMD="${NGASHOST}ARCHIVE?filename=file://${FULLPATH}"
INGESTOUT=$(/bin/curl $NGASCMD)
if echo $INGESTOUT | grep -i "error"; then
echo "Failed to ingest ${FILE}"
mv $1 $failed
exit 1
fi
done
mv $1 $ingested
"""
def upgrade():
op.execute(
f"""
UPDATE workflow_templates
SET content=E'{new_ingest_mk_four_obs_sh}'
WHERE filename='ingest_mk_four_obs.sh'
"""
)
def downgrade():
op.execute(
f"""
UPDATE workflow_templates
SET content=E'{old_ingest_mk_four_obs_sh}'
WHERE filename='ingest_mk_four_obs.sh'
"""
)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment