Skip to content
Snippets Groups Projects
Commit 116b761c authored by Nathan Bockisch's avatar Nathan Bockisch
Browse files

Merge branch 'ws-1910-ingest-mark4-workflow' into '2.8.2-DEVELOPMENT'

WS-1910: Added Mark 4 Observation Ingestion Workflow

See merge request !1439
parents 70a5202a d11b9660
No related branches found
No related tags found
2 merge requests!1452Merge 2.8.2 to main,!1439WS-1910: Added Mark 4 Observation Ingestion Workflow
Pipeline #11473 passed
"""add mark4 observation ingestion workflow
Revision ID: 08090cb7acc4
Revises: 705ed8cfc7be
Create Date: 2023-09-11 10:50:44.427048
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '08090cb7acc4'
down_revision = '705ed8cfc7be'
branch_labels = None
depends_on = None
wf_name = "ingest_mk_four_obs"
ingest_mk_four_obs_condor = """executable = ingest_mk_four_obs.sh
arguments = {{data_location}}
output = ingest.out
error = ingest.err
log = condor.log
SBIN_PATH = /lustre/aoc/cluster/pipeline/$ENV(CAPO_PROFILE)/workspaces/sbin
SPOOL_DIR = {{spool_dir}}
should_transfer_files = yes
transfer_input_files = $ENV(HOME)/.ssh/condor_transfer, nraorsync://$(SBIN_PATH)/pycapo
+WantIOProxy = True
request_memory = {{ramInGb}}
getenv = True
environment = "CAPO_PATH=/home/casa/capo"
requirements = HasLustre == True
queue
"""
ingest_mk_four_obs_sh = """#!/bin/sh
set -o errexit
cd $1
# Get NGAS hosts and set up variables to randomly select one
NGASHOSTSTR=$(./pycapo -q archive-ingestion.NGASHosts)
NGASHOSTARR=(`/bin/echo ${NGASHOSTSTR}`) # Put the space-delimited host list into an array
NGASHOSTLEN=${#NGASHOSTARR[@]}
for FILE in *; do
# Pick random NGAS host to distribute the ingestion load
NGASINDEX=$(($RANDOM % $NGASHOSTLEN))
NGASHOST=${NGASHOSTARR[$NGASINDEX]}
FULLPATH=$(/bin/readlink -f $FILE)
NGASCMD="${NGASHOST}ARCHIVE?filename=file://${FULLPATH}"
/bin/curl $NGASCMD
done"""
def upgrade():
op.execute(
f"""
INSERT INTO workflows (workflow_name, requires_lustre) VALUES (E'{wf_name}', true)
"""
)
op.execute(
f"""
INSERT INTO workflow_templates (filename, content, workflow_name)
VALUES ('ingest_mk_four_obs.condor', E'{ingest_mk_four_obs_condor}', E'{wf_name}')
"""
)
op.execute(
f"""
INSERT INTO workflow_templates (filename, content, workflow_name)
VALUES ('ingest_mk_four_obs.sh', E'{ingest_mk_four_obs_sh}', E'{wf_name}')
"""
)
def downgrade():
op.execute(
f"""
DELETE FROM workflow_templates WHERE workflow_name = E'{wf_name}'
"""
)
op.execute(
f"""
DELETE FROM workflows WHERE workflow_name = E'{wf_name}'
"""
)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment