From 2b824f59c4d78c604e110332403f8c7573203921 Mon Sep 17 00:00:00 2001
From: nhertz <nhertz@nrao.edu>
Date: Tue, 14 Sep 2021 15:11:47 -0600
Subject: [PATCH] Finalize support for DAGs in `workflow_service`

---
 .../workflow/services/workflow_service.py     | 63 ++++++++++++++++---
 1 file changed, 54 insertions(+), 9 deletions(-)

diff --git a/shared/workspaces/workspaces/workflow/services/workflow_service.py b/shared/workspaces/workspaces/workflow/services/workflow_service.py
index 803170b45..c72e5b24f 100644
--- a/shared/workspaces/workspaces/workflow/services/workflow_service.py
+++ b/shared/workspaces/workspaces/workflow/services/workflow_service.py
@@ -19,6 +19,7 @@ from messaging.router import Router, on_message
 from pycapo import CapoConfig
 from requests import Response
 
+from workspaces.system.services.remote_processing_service import CapoInjector
 from workspaces.workflow.enum import ArchiveWorkflows, WorkflowRequestState
 from workspaces.workflow.message_architect import (
     ArchiveMessageArchitect,
@@ -27,7 +28,6 @@ from workspaces.workflow.message_architect import (
 from workspaces.workflow.schema import WorkflowRequest, WorkflowRequestFile
 from workspaces.workflow.schema_interfaces import WorkflowIF, WorkflowRequestIF
 from workspaces.workflow.services.interfaces import WorkflowInfoIF, WorkflowServiceIF
-from workspaces.system.services.remote_processing_service import CapoInjector
 
 
 logger = logging.getLogger(__name__)
@@ -447,21 +447,56 @@ class WorkflowService(WorkflowServiceIF):
         logger.info("executing on folder %s", folder)
 
         # some file in here should end in .dag; that file is our dagman input
-        # dagman = list(folder.glob("*.dag"))[0]
-        # logger.info("dagman file %s exists.", dagman)
+        dag_file = list(folder.glob("*.dag"))[0]
+        if dag_file:
+            logger.info("dagman file %s exists.", dag_file)
+            return self._submit_dag(folder, dag_file)
+        else:
+            job_file = list(folder.glob("*.condor"))[0]
+            logger.info("condor file %s exists.", job_file)
+            return self._submit_job(folder, job_file)
+
+    def _submit_job(self, folder: Path, job_file: Path) -> Path:
+        """
+        Submit job file to HTCondor
+
+        :param folder: Folder to execute workflow in
+        :param job_file: Path to job submit file
+        :return: PAth to workflow log file
+        """
+        # ensure the log file exists
+        logfile = self._get_job_logfile_name(job_file)
+        logger.info("log file %s exists.", logfile)
+        logfile.touch()
+
+        # submit
+        logger.info("submitting job to condor...")
+        subprocess.run(
+            ["condor_submit", str(job_file)],
+            cwd=str(folder.absolute()),
+            preexec_fn=self._switch_to_submituser,
+        )
 
-        condor = list(folder.glob("*.condor"))[0]
-        logger.info("condor file %s exists.", condor)
+        # return the logfile
+        return logfile
+
+    def _submit_dag(self, folder: Path, dag_file: Path) -> Path:
+        """
+        Submit DAG file to HTCondor
 
+        :param folder: Folder to execute workflow in
+        :param dag_file: Path to DAG submit file
+        :return: Path to workflow log file
+        """
         # ensure the log file exists
-        logfile = self._get_logfile_name(condor)
+        logfile = self._get_dag_logfile_name(dag_file)
         logger.info("log file %s exists.", logfile)
         logfile.touch()
 
         # submit
-        logger.info("submitting to condor...")
+        logger.info("submitting DAG to condor...")
         subprocess.run(
-            ["condor_submit", str(condor)],
+            ["condor_submit_dag", str(dag_file)],
             cwd=str(folder.absolute()),
             preexec_fn=self._switch_to_submituser,
         )
@@ -483,7 +518,7 @@ class WorkflowService(WorkflowServiceIF):
         os.setuid(submituser_uid)
 
     @staticmethod
-    def _get_logfile_name(jobfile_name: Path) -> Path:
+    def _get_job_logfile_name(jobfile_name: Path) -> Path:
         """
         Read HTCondor job file and get the log file name, if it exists
 
@@ -497,6 +532,16 @@ class WorkflowService(WorkflowServiceIF):
                     return Path(logfile_name.strip())
         return Path(f"{jobfile_name.stem}.log")
 
+    @staticmethod
+    def _get_dag_logfile_name(dag_file: Path) -> Path:
+        """
+        Return path to DAG log file, which will always be {dag-file-name}.dagman.log
+
+        :param dag_file: Path to workflow DAG file
+        :return: Path to workflow DAG log
+        """
+        return Path(str(dag_file) + ".dagman.log")
+
     @staticmethod
     def _get_forbidden_templates_list(workflow_name: str):
         return [
-- 
GitLab