Skip to content
Snippets Groups Projects

WS-1966 Mark4 import workflow

Merged Daniel Nemergut requested to merge ws1966-mark4_import_workflow into main
All threads resolved!
1 file
+ 107
0
Compare changes
  • Side-by-side
  • Inline
"""mark4 workflow renaming
Revision ID: 91091612b6d0
Revises: 569416c40ca8
Create Date: 2023-09-22 13:34:54.054114
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '91091612b6d0'
down_revision = '569416c40ca8'
branch_labels = None
depends_on = None
old_ingest_mk_four_obs_sh = """#!/bin/sh
set -o errexit
# Get NGAS hosts and set up variables to randomly select one
NGASHOSTSTR=$(./pycapo -q archive-ingestion.NGASHosts)
NGASHOSTARR=(`/bin/echo ${NGASHOSTSTR}`) # Put the space-delimited host list into an array
NGASHOSTLEN=${#NGASHOSTARR[@]}
# Copy from the difx area to the Workspaces staging area
WSSTAGINGDIR=$(./pycapo -q edu.nrao.workspaces.IngestionSettings.stagingDirectory)
/bin/cp -r $1 $WSSTAGINGDIR
OBSDIR=$(/bin/basename $1)
/bin/chmod -R 750 $WSSTAGINGDIR/$OBSDIR # Make sure NGAS has permissions to ingest the files
cd $WSSTAGINGDIR/$OBSDIR
for FILE in *; do
# Pick random NGAS host to distribute the ingestion load
NGASINDEX=$(($RANDOM % $NGASHOSTLEN))
NGASHOST=${NGASHOSTARR[$NGASINDEX]}
FULLPATH=$(/bin/readlink -f $FILE)
NGASCMD="${NGASHOST}ARCHIVE?filename=file://${FULLPATH}"
INGESTOUT=$(/bin/curl $NGASCMD)
if echo $INGESTOUT | grep -i "error"; then
echo "Failed to ingest ${FILE}"
exit 1
fi
done"""
# Workflow will rename the directory to indicate the ingestion status
new_ingest_mk_four_obs_sh = """#!/bin/sh
set -o errexit
ingested=$1.ingested
failed=$1.failed
# Get NGAS hosts and set up variables to randomly select one
NGASHOSTSTR=$(./pycapo -q archive-ingestion.NGASHosts)
NGASHOSTARR=(`/bin/echo ${NGASHOSTSTR}`) # Put the space-delimited host list into an array
NGASHOSTLEN=${#NGASHOSTARR[@]}
# Copy from the difx area to the Workspaces staging area
WSSTAGINGDIR=$(./pycapo -q edu.nrao.workspaces.IngestionSettings.stagingDirectory)
/bin/cp -r $1 $WSSTAGINGDIR
OBSDIR=$(/bin/basename $1)
/bin/chmod -R 750 $WSSTAGINGDIR/$OBSDIR # Make sure NGAS has permissions to ingest the files
cd $WSSTAGINGDIR/$OBSDIR
for FILE in *; do
# Pick random NGAS host to distribute the ingestion load
NGASINDEX=$(($RANDOM % $NGASHOSTLEN))
NGASHOST=${NGASHOSTARR[$NGASINDEX]}
FULLPATH=$(/bin/readlink -f $FILE)
NGASCMD="${NGASHOST}ARCHIVE?filename=file://${FULLPATH}"
INGESTOUT=$(/bin/curl $NGASCMD)
if echo $INGESTOUT | grep -i "error"; then
echo "Failed to ingest ${FILE}"
mv $1 $failed
exit 1
fi
done
mv $1 $ingested
"""
def upgrade():
op.execute(
f"""
UPDATE workflow_templates
SET content=E'{new_ingest_mk_four_obs_sh}'
WHERE filename='ingest_mk_four_obs.sh'
"""
)
def downgrade():
op.execute(
f"""
UPDATE workflow_templates
SET content=E'{old_ingest_mk_four_obs_sh}'
WHERE filename='ingest_mk_four_obs.sh'
"""
)
Loading