Skip to content
Snippets Groups Projects

WS-677: Multi-stage workflow research & development

Merged Nathan Hertz requested to merge WS-677-dag-support-for-workflows into main
1 file
+ 54
9
Compare changes
  • Side-by-side
  • Inline
@@ -19,6 +19,7 @@ from messaging.router import Router, on_message
from pycapo import CapoConfig
from requests import Response
from workspaces.system.services.remote_processing_service import CapoInjector
from workspaces.workflow.enum import ArchiveWorkflows, WorkflowRequestState
from workspaces.workflow.message_architect import (
ArchiveMessageArchitect,
@@ -27,7 +28,6 @@ from workspaces.workflow.message_architect import (
from workspaces.workflow.schema import WorkflowRequest, WorkflowRequestFile
from workspaces.workflow.schema_interfaces import WorkflowIF, WorkflowRequestIF
from workspaces.workflow.services.interfaces import WorkflowInfoIF, WorkflowServiceIF
from workspaces.system.services.remote_processing_service import CapoInjector
logger = logging.getLogger(__name__)
@@ -447,21 +447,56 @@ class WorkflowService(WorkflowServiceIF):
logger.info("executing on folder %s", folder)
# some file in here should end in .dag; that file is our dagman input
# dagman = list(folder.glob("*.dag"))[0]
# logger.info("dagman file %s exists.", dagman)
dag_file = list(folder.glob("*.dag"))[0]
if dag_file:
logger.info("dagman file %s exists.", dag_file)
return self._submit_dag(folder, dag_file)
else:
job_file = list(folder.glob("*.condor"))[0]
logger.info("condor file %s exists.", job_file)
return self._submit_job(folder, job_file)
def _submit_job(self, folder: Path, job_file: Path) -> Path:
"""
Submit job file to HTCondor
:param folder: Folder to execute workflow in
:param job_file: Path to job submit file
:return: PAth to workflow log file
"""
# ensure the log file exists
logfile = self._get_job_logfile_name(job_file)
logger.info("log file %s exists.", logfile)
logfile.touch()
# submit
logger.info("submitting job to condor...")
subprocess.run(
["condor_submit", str(job_file)],
cwd=str(folder.absolute()),
preexec_fn=self._switch_to_submituser,
)
condor = list(folder.glob("*.condor"))[0]
logger.info("condor file %s exists.", condor)
# return the logfile
return logfile
def _submit_dag(self, folder: Path, dag_file: Path) -> Path:
"""
Submit DAG file to HTCondor
:param folder: Folder to execute workflow in
:param dag_file: Path to DAG submit file
:return: Path to workflow log file
"""
# ensure the log file exists
logfile = self._get_logfile_name(condor)
logfile = self._get_dag_logfile_name(dag_file)
logger.info("log file %s exists.", logfile)
logfile.touch()
# submit
logger.info("submitting to condor...")
logger.info("submitting DAG to condor...")
subprocess.run(
["condor_submit", str(condor)],
["condor_submit_dag", str(dag_file)],
cwd=str(folder.absolute()),
preexec_fn=self._switch_to_submituser,
)
@@ -483,7 +518,7 @@ class WorkflowService(WorkflowServiceIF):
os.setuid(submituser_uid)
@staticmethod
def _get_logfile_name(jobfile_name: Path) -> Path:
def _get_job_logfile_name(jobfile_name: Path) -> Path:
"""
Read HTCondor job file and get the log file name, if it exists
@@ -497,6 +532,16 @@ class WorkflowService(WorkflowServiceIF):
return Path(logfile_name.strip())
return Path(f"{jobfile_name.stem}.log")
@staticmethod
def _get_dag_logfile_name(dag_file: Path) -> Path:
"""
Return path to DAG log file, which will always be {dag-file-name}.dagman.log
:param dag_file: Path to workflow DAG file
:return: Path to workflow DAG log
"""
return Path(str(dag_file) + ".dagman.log")
@staticmethod
def _get_forbidden_templates_list(workflow_name: str):
return [
Loading