Skip to content
Snippets Groups Projects

WS-677: Multi-stage workflow research & development

Merged Nathan Hertz requested to merge WS-677-dag-support-for-workflows into main
1 file
+ 73
0
Compare changes
  • Side-by-side
  • Inline
"""add DAG-ified null workflow
Revision ID: 990c5cd70082
Revises: 257537f99abc
Create Date: 2021-09-14 15:12:31.154288
"""
from alembic import op
# revision identifiers, used by Alembic.
revision = "990c5cd70082"
down_revision = "257537f99abc"
branch_labels = None
depends_on = None
null_dag_script = """#!/bin/sh
./null $*
"""
null_dag_job_submit = """executable = null_dag.sh
arguments = {}
output = null_dag.$(jobname).out
error = null_dag.$(jobname).err
log = null_dag.$(jobname).log
SBIN_PATH = /lustre/aoc/cluster/pipeline/$ENV(CAPO_PROFILE)/workspaces/sbin
should_transfer_files = yes
transfer_input_files = $(SBIN_PATH)/null
queue
"""
null_dag_dag_submit = """JOB GREET null_dag_greeting.condor
VARS GREET jobname="$(JOB)"
JOB NAP null_dag_nap.condor
VARS NAP jobname="$(JOB)"
JOB VBOSE_GREET null_dag_verbose_greeting.condor
VARS VBOSE_GREET jobname="$(JOB)"
PARENT GREET CHILD NAP
PARENT NAP CHILD VBOSE_GREET
"""
def upgrade():
op.execute("INSERT INTO workflows (workflow_name) VALUES ('null_dag')")
op.execute(
"INSERT INTO workflow_templates (workflow_name, filename, content) "
f"VALUES ('null_dag', 'null_dag.sh', E'{null_dag_script}')"
)
op.execute(
"INSERT INTO workflow_templates (workflow_name, filename, content) "
f"VALUES ('null_dag', 'null_dag_greeting.condor', E'{null_dag_job_submit.format('--greeting')}')"
)
op.execute(
"INSERT INTO workflow_templates (workflow_name, filename, content) "
f"VALUES ('null_dag', 'null_dag_nap.condor', E'{null_dag_job_submit.format('--verbose --nap')}')"
)
op.execute(
"INSERT INTO workflow_templates (workflow_name, filename, content) "
f"VALUES ('null_dag', 'null_dag_verbose_greeting.condor', E'{null_dag_job_submit.format('--verbose --greeting')}')"
)
op.execute(
"INSERT INTO workflow_templates (workflow_name, filename, content) "
f"VALUES ('null_dag', 'null_dag.dag', E'{null_dag_dag_submit}')"
)
def downgrade():
op.execute("DELETE FROM workflows WHERE workflow_name = 'null_dag'")
op.execute("DELETE FROM workflow_templates WHERE workflow_name = 'null_dag'")
Loading