Skip to content
Snippets Groups Projects
e99b65ee2b4e_add_calibration_ingestion_workflow.py 1.59 KiB
Newer Older
"""add calibration ingestion workflow

Revision ID: e99b65ee2b4e
Revises: 2ac701610fc7
Create Date: 2021-07-06 11:44:30.072590

"""
from alembic import op
import sqlalchemy as sa


# revision identifiers, used by Alembic.
revision = "e99b65ee2b4e"
down_revision = "2ac701610fc7"
branch_labels = None
depends_on = None


condor_content = """executable = ingest_cal.sh
arguments = metadata.json

output = ingest.out
error = ingest.err
log = condor.log

SBIN_PATH = /lustre/aoc/cluster/pipeline/$ENV(CAPO_PROFILE)/workspaces/sbin
should_transfer_files = yes
transfer_input_files = $(SBIN_PATH)/conveyor, $(SBIN_PATH)/ingest_envoy, $(SBIN_PATH)/calibration-table-collector.sh, ./metadata.json

getenv = True
environment = "CAPO_PATH=/home/casa/capo"

queue
"""

script_content = """#!/bin/sh
set -o errexit

./conveyor --retrieve $1
./ingest_envoy --ingest-cal $1

"""


def upgrade():
    op.execute(
        """
        INSERT INTO workflows (workflow_name) VALUES ('ingest_cal')
        """
    )

    op.execute(
        f"""
        INSERT INTO workflow_templates (filename, content, workflow_name) 
        VALUES ('ingest_cal.condor', E'{condor_content}', 'ingest_cal') 
        """
    )

    op.execute(
        f"""
        INSERT INTO workflow_templates (filename, content, workflow_name) 
        VALUES ('ingest_cal.sh', E'{script_content}', 'ingest_cal') 
        """
    )


def downgrade():
    op.execute(
        """
        DELETE FROM workflows WHERE workflow_name='ingest-cal'
        """
    )

    op.execute(
        """
        DELETE FROM workflow_templates WHERE workflow_name='ingest-cal'
        """
    )