diff --git a/apps/cli/utilities/wf_monitor/wf_monitor/enum.py b/apps/cli/utilities/wf_monitor/wf_monitor/enum.py index e4cd70c058f6331a290b0237d3dd727e02f6f520..91ed973aabac9a9f49671db4f8ffeab813ce6f5b 100644 --- a/apps/cli/utilities/wf_monitor/wf_monitor/enum.py +++ b/apps/cli/utilities/wf_monitor/wf_monitor/enum.py @@ -144,3 +144,22 @@ class WorkflowStatusMessages(Enum): # used post 'terminated' conversion COMPLETE = "workflow-complete" + + +class DAGFinalStage(Enum): + """ + Listing of final DAG stage names for determining when to terminate log reading + """ + + CONVEY = "CONVEY" + DELIVER = "DELIVER" + FINISH = "FINISH" + POST = "POST" + + @classmethod + def values(cls): + return cls._value2member_map_ + + @classmethod + def is_final_stage(cls, stage_name: str): + return stage_name in cls.values() diff --git a/apps/cli/utilities/wf_monitor/wf_monitor/monitor.py b/apps/cli/utilities/wf_monitor/wf_monitor/monitor.py index 58486e4719c19621a2f34a9472dbd483d8df2473..0f5878628754b52ea8786cb8272e0daa03a5d003 100644 --- a/apps/cli/utilities/wf_monitor/wf_monitor/monitor.py +++ b/apps/cli/utilities/wf_monitor/wf_monitor/monitor.py @@ -36,7 +36,7 @@ from typing import Any, Callable, Dict, Tuple, Union, List from messaging.messenger import MessageSender from ._version import ___version___ as VERSION -from .enum import HtcEventCodes, WorkflowStatusMessages +from .enum import HtcEventCodes, WorkflowStatusMessages, DAGFinalStage # pylint: disable=E0401, E0402, W1203 @@ -141,13 +141,12 @@ def is_final_stage(body: str) -> bool: :param body: Body of event log entry :return: boolean for final stage """ - final_list = ["CONVEY", "DELIVER", "AUTOQA"] dag_node = r"DAG Node: (?P<dagnode>[A-Z]{4,})" dagnode = re.search(dag_node, body) logger.info(f"current DAG stage: {dagnode.group('dagnode')}") - if dagnode.group("dagnode") in final_list: + if DAGFinalStage.is_final_stage(dagnode.group("dagnode")): return True return False @@ -309,7 +308,7 @@ class WorkflowMonitor: job_count += 1 is_final_node = is_final_stage(event["condor_metadata"]["log"]) # DAG is still running, change message to 'continue' - if not is_final_node and job_count > 1: + if not is_final_node: event["type"] = WorkflowStatusMessages.CONTINUE.value send_message(event, self.message_router) diff --git a/shared/workspaces/alembic/versions/1aacf11f8ba4_standardize_final_dag_stage.py b/shared/workspaces/alembic/versions/1aacf11f8ba4_standardize_final_dag_stage.py new file mode 100644 index 0000000000000000000000000000000000000000..aa51d5f27b03abb340480ce4c313d2a33af8847c --- /dev/null +++ b/shared/workspaces/alembic/versions/1aacf11f8ba4_standardize_final_dag_stage.py @@ -0,0 +1,170 @@ +"""standardize final dag stage + +Revision ID: 1aacf11f8ba4 +Revises: bccbfba286d7 +Create Date: 2022-10-10 14:58:33.989745 + +""" +from alembic import op + + +# revision identifiers, used by Alembic. +revision = "1aacf11f8ba4" +down_revision = "bccbfba286d7" +branch_labels = None +depends_on = None + +new_null_dag = """JOB GREET null_dag_greeting.condor +VARS GREET jobname="$(JOB)" + +JOB NAP null_dag_nap.condor +VARS NAP jobname="$(JOB)" + +JOB FINISH null_dag_verbose_greeting.condor +VARS FINISH jobname="$(JOB)" + +PARENT GREET CHILD NAP +PARENT NAP CHILD FINISH +""" + +old_null_dag = """JOB GREET null_dag_greeting.condor +VARS GREET jobname="$(JOB)" + +JOB NAP null_dag_nap.condor +VARS NAP jobname="$(JOB)" + +JOB VBOSE_GREET null_dag_verbose_greeting.condor +VARS VBOSE_GREET jobname="$(JOB)" + +PARENT GREET CHILD NAP +PARENT NAP CHILD VBOSE_GREET +""" + + +new_ql_dag = """JOB FETCH vlass_ql_fetch.condor +VARS FETCH jobname="$(JOB)" + +JOB ENVOY vlass_ql_envoy.condor +VARS ENVOY jobname="$(JOB)" + +JOB POST vlass_ql_post.condor +VARS POST jobname="$(JOB)" + +PARENT FETCH CHILD ENVOY +PARENT ENVOY CHILD POST +""" + +old_ql_dag = """JOB FETCH vlass_ql_fetch.condor +VARS FETCH jobname="$(JOB)" + +JOB ENVOY vlass_ql_envoy.condor +VARS ENVOY jobname="$(JOB)" + +JOB AUTOQA vlass_ql_auto_qa.condor +VARS AUTOQA jobname="$(JOB)" + +PARENT FETCH CHILD ENVOY +PARENT ENVOY CHILD AUTOQA +""" + +new_ql_condor = """executable = vlass_ql_post.sh +arguments = metadata.json {{manager_job_id}} + +output = ql_post.out +error = ql_post.err +log = condor.log + +should_transfer_files = NO + +getenv = True +environment = "CAPO_PATH=/home/casa/capo" + +requirements = (VLASS == True) && (HasLustre == True) ++partition = "VLASS" + +queue +""" + +old_ql_condor = """executable = vlass_ql_auto_qa.sh +arguments = metadata.json {{manager_job_id}} + +output = ql_auto_qa.out +error = ql_auto_qa.err +log = condor.log + +should_transfer_files = NO + +getenv = True +environment = "CAPO_PATH=/home/casa/capo" + +requirements = (VLASS == True) && (HasLustre == True) ++partition = "VLASS" + +queue +""" + + +def upgrade(): + op.execute( + f""" + UPDATE workflow_templates + SET content=E'{new_null_dag}' where filename='null_dag.dag' + """ + ) + op.execute( + f""" + UPDATE workflow_templates + SET content=E'{new_ql_dag}' where filename='vlass_ql.dag' + """ + ) + op.execute( + f""" + UPDATE workflow_templates + SET content=E'{new_ql_condor}' where filename='vlass_ql_auto_qa.condor' + """ + ) + op.execute( + """ + UPDATE workflow_templates + SET filename='vlass_ql_post.sh' WHERE filename='vlass_ql_auto_qa.sh' + """ + ) + op.execute( + """ + UPDATE workflow_templates + SET filename='vlass_ql_post.condor' WHERE filename='vlass_ql_auto_qa.condor' + """ + ) + + +def downgrade(): + op.execute( + f""" + UPDATE workflow_templates + SET content=E'{old_null_dag}' where filename='null_dag.dag' + """ + ) + op.execute( + f""" + UPDATE workflow_templates + SET content=E'{old_ql_dag}' where filename='vlass_ql.dag' + """ + ) + op.execute( + """ + UPDATE workflow_templates + SET filename='vlass_ql_auto_qa.sh' WHERE filename='vlass_ql_post.sh' + """ + ) + op.execute( + """ + UPDATE workflow_templates + SET filename='vlass_ql_auto_qa.condor' WHERE filename='vlass_ql_post.condor' + """ + ) + op.execute( + f""" + UPDATE workflow_templates + SET content=E'{old_ql_condor}' where filename='vlass_ql_auto_qa.condor' + """ + )