Skip to content
Snippets Groups Projects
Commit 36faf9be authored by Charlotte Hausman's avatar Charlotte Hausman
Browse files

fix wf_monitor condor id update bug

parent 02fd1958
No related branches found
Tags end-of-sprint-46
1 merge request!1097fix wf_monitor condor id update bug
Pipeline #6953 passed
......@@ -144,3 +144,22 @@ class WorkflowStatusMessages(Enum):
# used post 'terminated' conversion
COMPLETE = "workflow-complete"
class DAGFinalStage(Enum):
"""
Listing of final DAG stage names for determining when to terminate log reading
"""
CONVEY = "CONVEY"
DELIVER = "DELIVER"
FINISH = "FINISH"
POST = "POST"
@classmethod
def values(cls):
return cls._value2member_map_
@classmethod
def is_final_stage(cls, stage_name: str):
return stage_name in cls.values()
......@@ -36,7 +36,7 @@ from typing import Any, Callable, Dict, Tuple, Union, List
from messaging.messenger import MessageSender
from ._version import ___version___ as VERSION
from .enum import HtcEventCodes, WorkflowStatusMessages
from .enum import HtcEventCodes, WorkflowStatusMessages, DAGFinalStage
# pylint: disable=E0401, E0402, W1203
......@@ -141,13 +141,12 @@ def is_final_stage(body: str) -> bool:
:param body: Body of event log entry
:return: boolean for final stage
"""
final_list = ["CONVEY", "DELIVER", "AUTOQA"]
dag_node = r"DAG Node: (?P<dagnode>[A-Z]{4,})"
dagnode = re.search(dag_node, body)
logger.info(f"current DAG stage: {dagnode.group('dagnode')}")
if dagnode.group("dagnode") in final_list:
if DAGFinalStage.is_final_stage(dagnode.group("dagnode")):
return True
return False
......@@ -309,7 +308,7 @@ class WorkflowMonitor:
job_count += 1
is_final_node = is_final_stage(event["condor_metadata"]["log"])
# DAG is still running, change message to 'continue'
if not is_final_node and job_count > 1:
if not is_final_node:
event["type"] = WorkflowStatusMessages.CONTINUE.value
send_message(event, self.message_router)
......
"""standardize final dag stage
Revision ID: 1aacf11f8ba4
Revises: bccbfba286d7
Create Date: 2022-10-10 14:58:33.989745
"""
from alembic import op
# revision identifiers, used by Alembic.
revision = "1aacf11f8ba4"
down_revision = "bccbfba286d7"
branch_labels = None
depends_on = None
new_null_dag = """JOB GREET null_dag_greeting.condor
VARS GREET jobname="$(JOB)"
JOB NAP null_dag_nap.condor
VARS NAP jobname="$(JOB)"
JOB FINISH null_dag_verbose_greeting.condor
VARS FINISH jobname="$(JOB)"
PARENT GREET CHILD NAP
PARENT NAP CHILD FINISH
"""
old_null_dag = """JOB GREET null_dag_greeting.condor
VARS GREET jobname="$(JOB)"
JOB NAP null_dag_nap.condor
VARS NAP jobname="$(JOB)"
JOB VBOSE_GREET null_dag_verbose_greeting.condor
VARS VBOSE_GREET jobname="$(JOB)"
PARENT GREET CHILD NAP
PARENT NAP CHILD VBOSE_GREET
"""
new_ql_dag = """JOB FETCH vlass_ql_fetch.condor
VARS FETCH jobname="$(JOB)"
JOB ENVOY vlass_ql_envoy.condor
VARS ENVOY jobname="$(JOB)"
JOB POST vlass_ql_post.condor
VARS POST jobname="$(JOB)"
PARENT FETCH CHILD ENVOY
PARENT ENVOY CHILD POST
"""
old_ql_dag = """JOB FETCH vlass_ql_fetch.condor
VARS FETCH jobname="$(JOB)"
JOB ENVOY vlass_ql_envoy.condor
VARS ENVOY jobname="$(JOB)"
JOB AUTOQA vlass_ql_auto_qa.condor
VARS AUTOQA jobname="$(JOB)"
PARENT FETCH CHILD ENVOY
PARENT ENVOY CHILD AUTOQA
"""
new_ql_condor = """executable = vlass_ql_post.sh
arguments = metadata.json {{manager_job_id}}
output = ql_post.out
error = ql_post.err
log = condor.log
should_transfer_files = NO
getenv = True
environment = "CAPO_PATH=/home/casa/capo"
requirements = (VLASS == True) && (HasLustre == True)
+partition = "VLASS"
queue
"""
old_ql_condor = """executable = vlass_ql_auto_qa.sh
arguments = metadata.json {{manager_job_id}}
output = ql_auto_qa.out
error = ql_auto_qa.err
log = condor.log
should_transfer_files = NO
getenv = True
environment = "CAPO_PATH=/home/casa/capo"
requirements = (VLASS == True) && (HasLustre == True)
+partition = "VLASS"
queue
"""
def upgrade():
op.execute(
f"""
UPDATE workflow_templates
SET content=E'{new_null_dag}' where filename='null_dag.dag'
"""
)
op.execute(
f"""
UPDATE workflow_templates
SET content=E'{new_ql_dag}' where filename='vlass_ql.dag'
"""
)
op.execute(
f"""
UPDATE workflow_templates
SET content=E'{new_ql_condor}' where filename='vlass_ql_auto_qa.condor'
"""
)
op.execute(
"""
UPDATE workflow_templates
SET filename='vlass_ql_post.sh' WHERE filename='vlass_ql_auto_qa.sh'
"""
)
op.execute(
"""
UPDATE workflow_templates
SET filename='vlass_ql_post.condor' WHERE filename='vlass_ql_auto_qa.condor'
"""
)
def downgrade():
op.execute(
f"""
UPDATE workflow_templates
SET content=E'{old_null_dag}' where filename='null_dag.dag'
"""
)
op.execute(
f"""
UPDATE workflow_templates
SET content=E'{old_ql_dag}' where filename='vlass_ql.dag'
"""
)
op.execute(
"""
UPDATE workflow_templates
SET filename='vlass_ql_auto_qa.sh' WHERE filename='vlass_ql_post.sh'
"""
)
op.execute(
"""
UPDATE workflow_templates
SET filename='vlass_ql_auto_qa.condor' WHERE filename='vlass_ql_post.condor'
"""
)
op.execute(
f"""
UPDATE workflow_templates
SET content=E'{old_ql_condor}' where filename='vlass_ql_auto_qa.condor'
"""
)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment