From 7c29f5031593c1ccb926528aa8e3490fe596e89c Mon Sep 17 00:00:00 2001
From: chausman <chausman@nrao.edu>
Date: Mon, 10 Jun 2024 15:13:06 -0600
Subject: [PATCH] - try to ensure all required log file exist before submission
 - message cleanup for external workflows

---
 .../wf_monitor/wf_monitor/monitor.py          |  5 +-
 .../workflow/services/workflow_service.py     | 77 ++++++++++++++-----
 2 files changed, 58 insertions(+), 24 deletions(-)

diff --git a/apps/cli/utilities/wf_monitor/wf_monitor/monitor.py b/apps/cli/utilities/wf_monitor/wf_monitor/monitor.py
index ffbe8197c..a546340b9 100644
--- a/apps/cli/utilities/wf_monitor/wf_monitor/monitor.py
+++ b/apps/cli/utilities/wf_monitor/wf_monitor/monitor.py
@@ -24,7 +24,6 @@ RabbitMQ exchange. Also input is a workflow request ID that is sent alongside th
 import argparse
 import functools
 import logging
-import os
 import re
 import signal
 import time
@@ -245,7 +244,7 @@ class WorkflowMonitor:
         time_remaining = timeout
         time_between_attempts = 3  # seconds
         signal.alarm(timeout)
-        while not os.path.exists(log_path):
+        while not log_path.is_file():
             logger.info(f"Attempting to read log file {log_path}... {time_remaining} seconds left...")
             time.sleep(time_between_attempts)
             time_remaining -= time_between_attempts
@@ -622,6 +621,6 @@ def main():
     """
 
     args = make_arg_parser().parse_args()
-    wf_monitor = WorkflowMonitor(args.log_path, int(args.workflow_request_id), eval(args.is_dag))
+    wf_monitor = WorkflowMonitor(args.log_path.rstrip(), int(args.workflow_request_id), eval(args.is_dag))
     logger.info(f"Closing wf_monitor for workflow request #{args.workflow_request_id}")
     wf_monitor.close()
diff --git a/shared/workspaces/workspaces/workflow/services/workflow_service.py b/shared/workspaces/workspaces/workflow/services/workflow_service.py
index 8da2b05f1..5f8f22269 100644
--- a/shared/workspaces/workspaces/workflow/services/workflow_service.py
+++ b/shared/workspaces/workspaces/workflow/services/workflow_service.py
@@ -367,12 +367,14 @@ class WorkflowService(WorkflowServiceIF):
         request.argument["spool_dir"] = request.results_dir
 
         # Send results_dir to capability layer to store in the workflow metadata
-        dir_info = WorkflowMessageArchitect(
-            request=request, json_payload={"processing_dir": request.argument["spool_dir"]}
-        ).compose_message("update_wf_metadata")
+        if request.controller == "WS":
+            # Don't send to external controllers
+            dir_info = WorkflowMessageArchitect(
+                request=request, json_payload={"processing_dir": request.argument["spool_dir"]}
+            ).compose_message("update_wf_metadata")
 
-        # send the message
-        self.messenger.send_message(**dir_info)
+            # send the message
+            self.messenger.send_message(**dir_info)
 
         # render all the templates
         if request.argument["need_project_metadata"] is True:
@@ -703,8 +705,7 @@ class WorkflowService(WorkflowServiceIF):
         """
         # ensure the log file exists
         logfile = self._get_job_logfile_name(job_file)
-        logger.info("log file %s exists.", logfile)
-        logfile.touch()
+        self._create_base_logfile(logfile)
 
         # submit
         logger.info("submitting job to condor...")
@@ -725,10 +726,12 @@ class WorkflowService(WorkflowServiceIF):
         :param dag_file: Path to DAG submit file
         :return: Path to workflow log file
         """
-        # ensure the log file exists
+        # ensure the dagman.log file exists
         logfile = self._get_dag_logfile_name(dag_file)
-        logger.info("log file %s exists.", logfile)
-        logfile.touch()
+        self._create_base_logfile(logfile)
+        # ensure the condor.log file exists, don't rely on workflow creation, DAG monitoring requires both
+        condorlog_path = folder.absolute() / "condor.log"
+        self._create_base_logfile(condorlog_path)
 
         # submit
         logger.info("submitting DAG to condor...")
@@ -780,6 +783,30 @@ class WorkflowService(WorkflowServiceIF):
         """
         return Path(str(dag_file) + ".dagman.log")
 
+    def _create_base_logfile(self, logfile: Path, iteration: int = 1) -> bool:
+        """
+        Create the expected log file for submission
+
+        :param logfile: the log to create
+        :param iteration: the number of current creation attempts
+        :return: boolean representing if file was created
+        """
+        if iteration == 3:
+            # try to create file several times, but don't be stupid about it
+            logger.warning(f"Warning: Failed to create {logfile} multiple times. Workflow may fail.")
+            return False
+
+        # create needed logfile if it doesn't exist
+        logfile.touch()
+        # verify that the file was created
+        if logfile.is_file():
+            logger.info("log file %s exists.", logfile)
+            return True
+        else:
+            logger.warning(f"Failed to create {logfile}. Trying again")
+            iteration += 1
+            self._create_base_logfile(logfile, iteration)
+
     @staticmethod
     def _get_forbidden_templates_list(wf_request: WorkflowRequest) -> List[str]:
         """
@@ -975,10 +1002,12 @@ class WorkflowMessageHandler:
                 request.htcondor_job_id = htcondor_job_id
                 request.htcondor_iterations = 1
                 # update UI
-                iterations_msg = WorkflowMessageArchitect(
-                    request=request, json_payload={"htcondor_iterations": request.htcondor_iterations}
-                ).compose_message("update_wf_metadata")
-                self.messenger.send_message(**iterations_msg)
+                if request.controller == "WS":
+                    # don't send to external systems
+                    iterations_msg = WorkflowMessageArchitect(
+                        request=request, json_payload={"htcondor_iterations": request.htcondor_iterations}
+                    ).compose_message("update_wf_metadata")
+                    self.messenger.send_message(**iterations_msg)
 
         elif message["type"] == "workflow-updated":
             status = WorkflowRequestState.Running.name if self._verify_state_change(request.state) else None
@@ -990,6 +1019,10 @@ class WorkflowMessageHandler:
                 if any(c.control_system == request.controller for c in self.info.all_external_controllers()):
                     tack_on = {**message, **stage_info}
                     self.send_external_event("update", **tack_on)
+            else:
+                logger.warning(
+                    f"Unable to update stage for request #{request.workflow_request_id}, received stage_info {stage_info}"
+                )
 
         elif message["type"] == "workflow-failed":
             request = self.info.refresh_request(request)
@@ -1018,13 +1051,15 @@ class WorkflowMessageHandler:
             logger.warning(user_message)
 
             # update UI
-            paused_msg = WorkflowMessageArchitect(
-                request=request,
-                json_payload={
-                    "user_warning_message": user_message,
-                },
-            ).compose_message("update_wf_metadata")
-            self.messenger.send_message(**paused_msg)
+            if request.controller == "WS":
+                # don't send to external system
+                paused_msg = WorkflowMessageArchitect(
+                    request=request,
+                    json_payload={
+                        "user_warning_message": user_message,
+                    },
+                ).compose_message("update_wf_metadata")
+                self.messenger.send_message(**paused_msg)
 
         elif message["type"] == "workflow-restarted":
             request.htcondor_iterations = request.htcondor_iterations + 1
-- 
GitLab