Skip to content
Snippets Groups Projects

WS-677: Multi-stage workflow research & development

Merged Nathan Hertz requested to merge WS-677-dag-support-for-workflows into main
1 file
+ 5
5
Compare changes
  • Side-by-side
  • Inline
@@ -19,6 +19,7 @@ from messaging.router import Router, on_message
from pycapo import CapoConfig
from requests import Response
from workspaces.system.services.remote_processing_service import CapoInjector
from workspaces.workflow.enum import ArchiveWorkflows, WorkflowRequestState
from workspaces.workflow.message_architect import (
ArchiveMessageArchitect,
@@ -27,7 +28,6 @@ from workspaces.workflow.message_architect import (
from workspaces.workflow.schema import WorkflowRequest, WorkflowRequestFile
from workspaces.workflow.schema_interfaces import WorkflowIF, WorkflowRequestIF
from workspaces.workflow.services.interfaces import WorkflowInfoIF, WorkflowServiceIF
from workspaces.system.services.remote_processing_service import CapoInjector
logger = logging.getLogger(__name__)
@@ -47,8 +47,7 @@ class WorkflowServiceRESTClient(WorkflowServiceIF):
:return: the response as JSON
"""
response = requests.post(
f"{self.url}/workflows/{request.workflow_name}"
f"/requests/{request.workflow_request_id}/submit"
f"{self.url}/workflows/{request.workflow_name}" f"/requests/{request.workflow_request_id}/submit"
)
logger.info(
"Got result %s with type %s and body %s",
@@ -58,9 +57,7 @@ class WorkflowServiceRESTClient(WorkflowServiceIF):
)
return response.json()
def attach_file_to_request(
self, request: WorkflowRequestIF, filename: str, content: bytes
) -> Response:
def attach_file_to_request(self, request: WorkflowRequestIF, filename: str, content: bytes) -> Response:
"""
Add a file to this workflow request.
@@ -86,14 +83,10 @@ class WorkflowServiceRESTClient(WorkflowServiceIF):
:return: dict containing file content
"""
response = requests.get(
f"{self.url}/workflows/{name}/requests/{request_id}/files/{filename}"
)
response = requests.get(f"{self.url}/workflows/{name}/requests/{request_id}/files/{filename}")
return response.content.decode()
def create_workflow_request(
self, workflow: Union[str, WorkflowIF], argument: Dict
) -> WorkflowRequestIF:
def create_workflow_request(self, workflow: Union[str, WorkflowIF], argument: Dict) -> WorkflowRequestIF:
"""
Create a workflow request using the supplied arguments.
@@ -129,9 +122,7 @@ class WorkflowServiceRESTClient(WorkflowServiceIF):
:param request: completed workflow request to ingest
:return:
"""
requests.post(
f"{self.url}/workflows/{request.workflow_name}/requests/{request.workflow_request_id}/ingest"
)
requests.post(f"{self.url}/workflows/{request.workflow_name}/requests/{request.workflow_request_id}/ingest")
class WorkflowService(WorkflowServiceIF):
@@ -163,10 +154,7 @@ class WorkflowService(WorkflowServiceIF):
"""
forbidden = self._get_forbidden_templates_list(request.workflow_name)
if filename not in forbidden:
if (
ArchiveWorkflows.is_archive_wf(request.workflow_name)
and filename == "metadata.json"
):
if ArchiveWorkflows.is_archive_wf(request.workflow_name) and filename == "metadata.json":
content_dict = json.loads(content.decode())
content_dict["workflowName"] = request.workflow_name
content_dict["data_location"] = request.argument["data_location"]
@@ -199,9 +187,7 @@ class WorkflowService(WorkflowServiceIF):
:param workflow_request_id: ID of carta workflow that wants to send this message
:param carta_url: JSON blob with CARTA URL
"""
logger.info(
f"SENDING CARTA MESSAGE to AAT Request Handler for request #{workflow_request_id}!"
)
logger.info(f"SENDING CARTA MESSAGE to AAT Request Handler for request #{workflow_request_id}!")
wf_request = self.info.lookup_workflow_request(workflow_request_id)
routing_key = f"ws-workflow.carta-instance-ready.{workflow_request_id}"
carta_url_msg = ArchiveMessageArchitect(
@@ -242,11 +228,7 @@ class WorkflowService(WorkflowServiceIF):
# create a temporary directory if processing directory is not supplied,
# needs to exist before template rendering
temp_folder = (
self._make_temp_directory(request)
if not request.results_dir
else Path(request.results_dir)
)
temp_folder = self._make_temp_directory(request) if not request.results_dir else Path(request.results_dir)
# if remote is true, create a capo subspace file in the request's directory
if remote:
@@ -326,9 +308,7 @@ class WorkflowService(WorkflowServiceIF):
)
request.argument["ramInGb"] = self.processing_settings.ramInGb
def _render_with_metadata(
self, wf_request: WorkflowRequestIF, tempdir: Path, wf_definition: WorkflowIF
):
def _render_with_metadata(self, wf_request: WorkflowRequestIF, tempdir: Path, wf_definition: WorkflowIF):
name = wf_request.workflow_name
if "calibration" in name:
wrest_type = "-sc"
@@ -348,9 +328,7 @@ class WorkflowService(WorkflowServiceIF):
argument2 = []
elif "ingest" in name:
wrest_type = "-aux"
parent_req = self.info.lookup_workflow_request(
int(wf_request.argument["parent_wf_request_id"])
)
parent_req = self.info.lookup_workflow_request(int(wf_request.argument["parent_wf_request_id"]))
eb = (
parent_req.argument["product_locator"]
if "product_locator" in parent_req.argument
@@ -381,15 +359,11 @@ class WorkflowService(WorkflowServiceIF):
else:
logger.error(wf_json.decode())
logger.info("SENDING WORKFLOW FAIL MESSAGE!")
failed_msg = WorkflowMessageArchitect(request=wf_request).compose_message(
"workflow_failed"
)
failed_msg = WorkflowMessageArchitect(request=wf_request).compose_message("workflow_failed")
self.messenger.send_message(**failed_msg)
return wf_request
def _determine_usable_files(
self, request: WorkflowRequestIF, templated_files: List[WorkflowRequestFile]
):
def _determine_usable_files(self, request: WorkflowRequestIF, templated_files: List[WorkflowRequestFile]):
# Override templates if user supplied file has same name and is a valid input file
usable_templates = []
usable_files = []
@@ -447,21 +421,56 @@ class WorkflowService(WorkflowServiceIF):
logger.info("executing on folder %s", folder)
# some file in here should end in .dag; that file is our dagman input
# dagman = list(folder.glob("*.dag"))[0]
# logger.info("dagman file %s exists.", dagman)
dag_files = list(folder.glob("*.dag"))
if dag_files:
logger.info("dagman file %s exists.", dag_files[0])
return self._submit_dag(folder, dag_files[0])
else:
job_file = list(folder.glob("*.condor"))[0]
logger.info("condor file %s exists.", job_file)
return self._submit_job(folder, job_file)
def _submit_job(self, folder: Path, job_file: Path) -> Path:
"""
Submit job file to HTCondor
:param folder: Folder to execute workflow in
:param job_file: Path to job submit file
:return: PAth to workflow log file
"""
# ensure the log file exists
logfile = self._get_job_logfile_name(job_file)
logger.info("log file %s exists.", logfile)
logfile.touch()
# submit
logger.info("submitting job to condor...")
subprocess.run(
["condor_submit", str(job_file)],
cwd=str(folder.absolute()),
preexec_fn=self._switch_to_submituser,
)
# return the logfile
return logfile
condor = list(folder.glob("*.condor"))[0]
logger.info("condor file %s exists.", condor)
def _submit_dag(self, folder: Path, dag_file: Path) -> Path:
"""
Submit DAG file to HTCondor
:param folder: Folder to execute workflow in
:param dag_file: Path to DAG submit file
:return: Path to workflow log file
"""
# ensure the log file exists
logfile = self._get_logfile_name(condor)
logfile = self._get_dag_logfile_name(dag_file)
logger.info("log file %s exists.", logfile)
logfile.touch()
# submit
logger.info("submitting to condor...")
logger.info("submitting DAG to condor...")
subprocess.run(
["condor_submit", str(condor)],
["condor_submit_dag", "-f", str(dag_file)],
cwd=str(folder.absolute()),
preexec_fn=self._switch_to_submituser,
)
@@ -483,7 +492,7 @@ class WorkflowService(WorkflowServiceIF):
os.setuid(submituser_uid)
@staticmethod
def _get_logfile_name(jobfile_name: Path) -> Path:
def _get_job_logfile_name(jobfile_name: Path) -> Path:
"""
Read HTCondor job file and get the log file name, if it exists
@@ -497,6 +506,16 @@ class WorkflowService(WorkflowServiceIF):
return Path(logfile_name.strip())
return Path(f"{jobfile_name.stem}.log")
@staticmethod
def _get_dag_logfile_name(dag_file: Path) -> Path:
"""
Return path to DAG log file, which will always be {dag-file-name}.dagman.log
:param dag_file: Path to workflow DAG file
:return: Path to workflow DAG log
"""
return Path(str(dag_file) + ".dagman.log")
@staticmethod
def _get_forbidden_templates_list(workflow_name: str):
return [
@@ -569,9 +588,9 @@ class WorkflowMessageHandler:
logger.info("SENDING INGESTION COMPLETE MESSAGE!")
subject["execution_wf_id"] = wf_request.argument["parent_wf_request_id"]
ingestion_complete_msg = WorkflowMessageArchitect(
previous_info=subject
).compose_message("ingestion_complete")
ingestion_complete_msg = WorkflowMessageArchitect(previous_info=subject).compose_message(
"ingestion_complete"
)
self.messenger.send_message(**ingestion_complete_msg)
@on_message(service="workflow")
@@ -594,9 +613,7 @@ class WorkflowMessageHandler:
if htcondor_job_id := int(message["condor_metadata"]["condor_job_id"]):
# Workflow has corresponding condor job ID
logger.info(
f"Workflow request has an HTCondor job ID of {htcondor_job_id}. Setting DB column!"
)
logger.info(f"Workflow request has an HTCondor job ID of {htcondor_job_id}. Setting DB column!")
request.htcondor_job_id = htcondor_job_id
elif message["type"] == "workflow-complete":
status = WorkflowRequestState.Complete.name
@@ -631,8 +648,7 @@ class WorkflowMessageHandler:
except Exception as exc:
transaction.abort()
logger.error(
f"Failed to update status on workflow request "
f"{request.workflow_request_id} to {status}: {exc}"
f"Failed to update status on workflow request " f"{request.workflow_request_id} to {status}: {exc}"
)
else:
logger.warning(f"Message {message} does not concern a workflow request. Ignoring.")
@@ -642,24 +658,19 @@ class WorkflowMessageHandler:
wf_id = subject["workflow_request_id"]
wf_request = self.info.lookup_workflow_request(wf_id)
if (
wf_request.workflow_name == ArchiveWorkflows.CARTA.value
and wf_request.argument["notify_ready"] is False
):
logger.info(
f"SENDING FAILED CARTA MESSAGE to AAT Request Handler for request #{wf_id}!"
)
if wf_request.workflow_name == ArchiveWorkflows.CARTA.value and wf_request.argument["notify_ready"] is False:
logger.info(f"SENDING FAILED CARTA MESSAGE to AAT Request Handler for request #{wf_id}!")
routing_key = f"ws-workflow.carta-instance-ready.{wf_id}"
carta_url_msg = ArchiveMessageArchitect(
routing_key=routing_key, request=wf_request
).compose_message("carta_failed")
carta_url_msg = ArchiveMessageArchitect(routing_key=routing_key, request=wf_request).compose_message(
"carta_failed"
)
self.archive_messenger.send_message(**carta_url_msg)
if wf_request.workflow_name == ArchiveWorkflows.SECI.value:
logger.info(f"SENDING FAILED SECI MESSAGE to VLASS Manager for request #{wf_id}!")
routing_key = f"ws-workflow.seci.{wf_id}"
seci_msg = ArchiveMessageArchitect(
routing_key=routing_key, request=wf_request
).compose_message("seci_failed")
seci_msg = ArchiveMessageArchitect(routing_key=routing_key, request=wf_request).compose_message(
"seci_failed"
)
self.archive_messenger.send_message(**seci_msg)
def send_archive_complete_event(self, **message: Dict):
@@ -670,9 +681,9 @@ class WorkflowMessageHandler:
if wf_request.workflow_name == ArchiveWorkflows.SECI.value:
logger.info(f"SENDING SECI COMPLETE MESSAGE to VLASS Manager for request #{wf_id}!")
routing_key = f"ws-workflow.seci.{wf_id}"
seci_msg = ArchiveMessageArchitect(
routing_key=routing_key, request=wf_request
).compose_message("seci_complete")
seci_msg = ArchiveMessageArchitect(routing_key=routing_key, request=wf_request).compose_message(
"seci_complete"
)
self.archive_messenger.send_message(**seci_msg)
def clean_remote_workflow(self, request: WorkflowRequestIF):
Loading