From 61c760415f0e0657aa637158ef8c1b68f3c63a85 Mon Sep 17 00:00:00 2001 From: Nathan Hertz <nhertz@nrao.edu> Date: Wed, 15 Sep 2021 13:43:00 -0400 Subject: [PATCH] WS-677: Multi-stage workflow research & development --- ...90c5cd70082_add_dag_ified_null_workflow.py | 73 +++++++++ .../workspaces/test/test_workflow_service.py | 82 ++++++++++ .../workflow/services/workflow_service.py | 153 ++++++++++-------- 3 files changed, 237 insertions(+), 71 deletions(-) create mode 100644 schema/versions/990c5cd70082_add_dag_ified_null_workflow.py diff --git a/schema/versions/990c5cd70082_add_dag_ified_null_workflow.py b/schema/versions/990c5cd70082_add_dag_ified_null_workflow.py new file mode 100644 index 000000000..eeb6ca852 --- /dev/null +++ b/schema/versions/990c5cd70082_add_dag_ified_null_workflow.py @@ -0,0 +1,73 @@ +"""add DAG-ified null workflow + +Revision ID: 990c5cd70082 +Revises: 257537f99abc +Create Date: 2021-09-14 15:12:31.154288 + +""" +from alembic import op + +# revision identifiers, used by Alembic. +revision = "990c5cd70082" +down_revision = "257537f99abc" +branch_labels = None +depends_on = None + +null_dag_script = """#!/bin/sh + +./null $* +""" +null_dag_job_submit = """executable = null_dag.sh +arguments = {} + +output = null_dag.$(jobname).out +error = null_dag.$(jobname).err +log = null_dag.$(jobname).log + +SBIN_PATH = /lustre/aoc/cluster/pipeline/$ENV(CAPO_PROFILE)/workspaces/sbin +should_transfer_files = yes +transfer_input_files = $(SBIN_PATH)/null + +queue +""" +null_dag_dag_submit = """JOB GREET null_dag_greeting.condor +VARS GREET jobname="$(JOB)" + +JOB NAP null_dag_nap.condor +VARS NAP jobname="$(JOB)" + +JOB VBOSE_GREET null_dag_verbose_greeting.condor +VARS VBOSE_GREET jobname="$(JOB)" + +PARENT GREET CHILD NAP +PARENT NAP CHILD VBOSE_GREET +""" + + +def upgrade(): + op.execute("INSERT INTO workflows (workflow_name) VALUES ('null_dag')") + op.execute( + "INSERT INTO workflow_templates (workflow_name, filename, content) " + f"VALUES ('null_dag', 'null_dag.sh', E'{null_dag_script}')" + ) + op.execute( + "INSERT INTO workflow_templates (workflow_name, filename, content) " + f"VALUES ('null_dag', 'null_dag_greeting.condor', E'{null_dag_job_submit.format('--greeting')}')" + ) + op.execute( + "INSERT INTO workflow_templates (workflow_name, filename, content) " + f"VALUES ('null_dag', 'null_dag_nap.condor', E'{null_dag_job_submit.format('--verbose --nap')}')" + ) + op.execute( + "INSERT INTO workflow_templates (workflow_name, filename, content) " + f"VALUES ('null_dag', 'null_dag_verbose_greeting.condor', E'{null_dag_job_submit.format('--verbose --greeting')}')" + ) + op.execute( + "INSERT INTO workflow_templates (workflow_name, filename, content) " + f"VALUES ('null_dag', 'null_dag.dag', E'{null_dag_dag_submit}')" + ) + + +def downgrade(): + op.execute("DELETE FROM workflows WHERE workflow_name = 'null_dag'") + op.execute("DELETE FROM workflow_templates WHERE workflow_name = 'null_dag'") diff --git a/shared/workspaces/test/test_workflow_service.py b/shared/workspaces/test/test_workflow_service.py index 7da509017..d05100095 100644 --- a/shared/workspaces/test/test_workflow_service.py +++ b/shared/workspaces/test/test_workflow_service.py @@ -1,7 +1,9 @@ """ Tests for the workflow service """ import json +import os import pathlib +import shutil from typing import List # pylint: disable=E0401, R0201, W0212 @@ -133,6 +135,7 @@ class TestWorkflowService: }, "type": "workflow-complete", } + assert request.state == "Ready" with patch("workspaces.workflow.schema.WorkflowRequest.update_status") as mock_update: @@ -175,6 +178,85 @@ class TestWorkflowService: usable = mock_workflow_service._determine_usable_files(request, files) assert len(usable) == 3 + def test_execute_prepared(self, mock_workflow_service: WorkflowService, tmp_path: pathlib.Path): + # Create fake folder and DAG file + fake_folder = tmp_path / "test" + fake_folder.mkdir() + fake_dagfile = fake_folder / "test.dag" + fake_dagfile.touch() + + orig_submit_dag = mock_workflow_service._submit_dag + mock_workflow_service._submit_dag = MagicMock() + + # Test DAG submission + mock_workflow_service._execute_prepared(fake_folder) + mock_workflow_service._submit_dag.assert_called_with(fake_folder, fake_dagfile) + + # DAG cleanup + mock_workflow_service._submit_dag = orig_submit_dag + os.remove(fake_dagfile) + + # Create fake submit file + fake_jobfile = fake_folder / "test.condor" + fake_jobfile.touch() + + orig_submit_job = mock_workflow_service._submit_job + mock_workflow_service._submit_job = MagicMock() + + # Test job submission + mock_workflow_service._execute_prepared(fake_folder) + mock_workflow_service._submit_job.assert_called_with(fake_folder, fake_jobfile) + + # Cleanup + mock_workflow_service._submit_job = orig_submit_job + shutil.rmtree(tmp_path) + + def test_submit_job(self, mock_workflow_service: WorkflowService): + fake_folder = pathlib.Path("fake_folder") + fake_jobfile = pathlib.Path("fake.submit") + fake_jobfile.touch() + with patch("workspaces.workflow.services.workflow_service.subprocess.run") as mock_sp_run: + log_path = mock_workflow_service._submit_job(fake_folder, fake_jobfile) + mock_sp_run.assert_called_with( + ["condor_submit", fake_jobfile.name], + cwd=str(fake_folder.absolute()), + preexec_fn=mock_workflow_service._switch_to_submituser, + ) + assert log_path.name == "fake.log" + + os.remove(fake_jobfile) + + def test_submit_dag(self, mock_workflow_service: WorkflowService): + fake_folder = pathlib.Path("fake_folder") + fake_dagfile = pathlib.Path("fake.dag") + with patch("workspaces.workflow.services.workflow_service.subprocess.run") as mock_sp_run: + log_path = mock_workflow_service._submit_dag(fake_folder, fake_dagfile) + mock_sp_run.assert_called_with( + ["condor_submit_dag", "-f", fake_dagfile.name], + cwd=str(fake_folder.absolute()), + preexec_fn=mock_workflow_service._switch_to_submituser, + ) + assert log_path.name == "fake.dag.dagman.log" + + def test_get_jobfile_log(self, tmp_path: pathlib.Path, mock_workflow_service: WorkflowService): + with open(tmp_path / "fake.submit", "w") as fake_jobfile: + # Test with file that doesn't containe "log = " entry + log_path = mock_workflow_service._get_job_logfile_name(tmp_path / "fake.submit") + assert log_path.name == "fake.log" + # Write "log = " entry + fake_jobfile.write("log = test.log") + + # Test with same file that now contains "log = " entry + log_path = mock_workflow_service._get_job_logfile_name(tmp_path / "fake.submit") + assert log_path.name == "test.log" + + # Remove tmp_path just in case + shutil.rmtree(tmp_path) + + def test_get_dagfile_log(self, mock_workflow_service: WorkflowService): + log_path = mock_workflow_service._get_dag_logfile_name(pathlib.Path("fake.dag")) + assert log_path.name == "fake.dag.dagman.log" + @pytest.mark.skip("test broken") @patch("workspaces.workflow.services.workflow_service.Router") def test_send_carta_url_to_aat(self, mock_router: MagicMock, mock_workflow_service: WorkflowService): diff --git a/shared/workspaces/workspaces/workflow/services/workflow_service.py b/shared/workspaces/workspaces/workflow/services/workflow_service.py index 803170b45..dceb755b9 100644 --- a/shared/workspaces/workspaces/workflow/services/workflow_service.py +++ b/shared/workspaces/workspaces/workflow/services/workflow_service.py @@ -19,6 +19,7 @@ from messaging.router import Router, on_message from pycapo import CapoConfig from requests import Response +from workspaces.system.services.remote_processing_service import CapoInjector from workspaces.workflow.enum import ArchiveWorkflows, WorkflowRequestState from workspaces.workflow.message_architect import ( ArchiveMessageArchitect, @@ -27,7 +28,6 @@ from workspaces.workflow.message_architect import ( from workspaces.workflow.schema import WorkflowRequest, WorkflowRequestFile from workspaces.workflow.schema_interfaces import WorkflowIF, WorkflowRequestIF from workspaces.workflow.services.interfaces import WorkflowInfoIF, WorkflowServiceIF -from workspaces.system.services.remote_processing_service import CapoInjector logger = logging.getLogger(__name__) @@ -47,8 +47,7 @@ class WorkflowServiceRESTClient(WorkflowServiceIF): :return: the response as JSON """ response = requests.post( - f"{self.url}/workflows/{request.workflow_name}" - f"/requests/{request.workflow_request_id}/submit" + f"{self.url}/workflows/{request.workflow_name}" f"/requests/{request.workflow_request_id}/submit" ) logger.info( "Got result %s with type %s and body %s", @@ -58,9 +57,7 @@ class WorkflowServiceRESTClient(WorkflowServiceIF): ) return response.json() - def attach_file_to_request( - self, request: WorkflowRequestIF, filename: str, content: bytes - ) -> Response: + def attach_file_to_request(self, request: WorkflowRequestIF, filename: str, content: bytes) -> Response: """ Add a file to this workflow request. @@ -86,14 +83,10 @@ class WorkflowServiceRESTClient(WorkflowServiceIF): :return: dict containing file content """ - response = requests.get( - f"{self.url}/workflows/{name}/requests/{request_id}/files/{filename}" - ) + response = requests.get(f"{self.url}/workflows/{name}/requests/{request_id}/files/{filename}") return response.content.decode() - def create_workflow_request( - self, workflow: Union[str, WorkflowIF], argument: Dict - ) -> WorkflowRequestIF: + def create_workflow_request(self, workflow: Union[str, WorkflowIF], argument: Dict) -> WorkflowRequestIF: """ Create a workflow request using the supplied arguments. @@ -129,9 +122,7 @@ class WorkflowServiceRESTClient(WorkflowServiceIF): :param request: completed workflow request to ingest :return: """ - requests.post( - f"{self.url}/workflows/{request.workflow_name}/requests/{request.workflow_request_id}/ingest" - ) + requests.post(f"{self.url}/workflows/{request.workflow_name}/requests/{request.workflow_request_id}/ingest") class WorkflowService(WorkflowServiceIF): @@ -163,10 +154,7 @@ class WorkflowService(WorkflowServiceIF): """ forbidden = self._get_forbidden_templates_list(request.workflow_name) if filename not in forbidden: - if ( - ArchiveWorkflows.is_archive_wf(request.workflow_name) - and filename == "metadata.json" - ): + if ArchiveWorkflows.is_archive_wf(request.workflow_name) and filename == "metadata.json": content_dict = json.loads(content.decode()) content_dict["workflowName"] = request.workflow_name content_dict["data_location"] = request.argument["data_location"] @@ -199,9 +187,7 @@ class WorkflowService(WorkflowServiceIF): :param workflow_request_id: ID of carta workflow that wants to send this message :param carta_url: JSON blob with CARTA URL """ - logger.info( - f"SENDING CARTA MESSAGE to AAT Request Handler for request #{workflow_request_id}!" - ) + logger.info(f"SENDING CARTA MESSAGE to AAT Request Handler for request #{workflow_request_id}!") wf_request = self.info.lookup_workflow_request(workflow_request_id) routing_key = f"ws-workflow.carta-instance-ready.{workflow_request_id}" carta_url_msg = ArchiveMessageArchitect( @@ -242,11 +228,7 @@ class WorkflowService(WorkflowServiceIF): # create a temporary directory if processing directory is not supplied, # needs to exist before template rendering - temp_folder = ( - self._make_temp_directory(request) - if not request.results_dir - else Path(request.results_dir) - ) + temp_folder = self._make_temp_directory(request) if not request.results_dir else Path(request.results_dir) # if remote is true, create a capo subspace file in the request's directory if remote: @@ -326,9 +308,7 @@ class WorkflowService(WorkflowServiceIF): ) request.argument["ramInGb"] = self.processing_settings.ramInGb - def _render_with_metadata( - self, wf_request: WorkflowRequestIF, tempdir: Path, wf_definition: WorkflowIF - ): + def _render_with_metadata(self, wf_request: WorkflowRequestIF, tempdir: Path, wf_definition: WorkflowIF): name = wf_request.workflow_name if "calibration" in name: wrest_type = "-sc" @@ -348,9 +328,7 @@ class WorkflowService(WorkflowServiceIF): argument2 = [] elif "ingest" in name: wrest_type = "-aux" - parent_req = self.info.lookup_workflow_request( - int(wf_request.argument["parent_wf_request_id"]) - ) + parent_req = self.info.lookup_workflow_request(int(wf_request.argument["parent_wf_request_id"])) eb = ( parent_req.argument["product_locator"] if "product_locator" in parent_req.argument @@ -381,15 +359,11 @@ class WorkflowService(WorkflowServiceIF): else: logger.error(wf_json.decode()) logger.info("SENDING WORKFLOW FAIL MESSAGE!") - failed_msg = WorkflowMessageArchitect(request=wf_request).compose_message( - "workflow_failed" - ) + failed_msg = WorkflowMessageArchitect(request=wf_request).compose_message("workflow_failed") self.messenger.send_message(**failed_msg) return wf_request - def _determine_usable_files( - self, request: WorkflowRequestIF, templated_files: List[WorkflowRequestFile] - ): + def _determine_usable_files(self, request: WorkflowRequestIF, templated_files: List[WorkflowRequestFile]): # Override templates if user supplied file has same name and is a valid input file usable_templates = [] usable_files = [] @@ -447,21 +421,56 @@ class WorkflowService(WorkflowServiceIF): logger.info("executing on folder %s", folder) # some file in here should end in .dag; that file is our dagman input - # dagman = list(folder.glob("*.dag"))[0] - # logger.info("dagman file %s exists.", dagman) + dag_files = list(folder.glob("*.dag")) + if dag_files: + logger.info("dagman file %s exists.", dag_files[0]) + return self._submit_dag(folder, dag_files[0]) + else: + job_file = list(folder.glob("*.condor"))[0] + logger.info("condor file %s exists.", job_file) + return self._submit_job(folder, job_file) + + def _submit_job(self, folder: Path, job_file: Path) -> Path: + """ + Submit job file to HTCondor + + :param folder: Folder to execute workflow in + :param job_file: Path to job submit file + :return: PAth to workflow log file + """ + # ensure the log file exists + logfile = self._get_job_logfile_name(job_file) + logger.info("log file %s exists.", logfile) + logfile.touch() + + # submit + logger.info("submitting job to condor...") + subprocess.run( + ["condor_submit", str(job_file)], + cwd=str(folder.absolute()), + preexec_fn=self._switch_to_submituser, + ) + + # return the logfile + return logfile - condor = list(folder.glob("*.condor"))[0] - logger.info("condor file %s exists.", condor) + def _submit_dag(self, folder: Path, dag_file: Path) -> Path: + """ + Submit DAG file to HTCondor + :param folder: Folder to execute workflow in + :param dag_file: Path to DAG submit file + :return: Path to workflow log file + """ # ensure the log file exists - logfile = self._get_logfile_name(condor) + logfile = self._get_dag_logfile_name(dag_file) logger.info("log file %s exists.", logfile) logfile.touch() # submit - logger.info("submitting to condor...") + logger.info("submitting DAG to condor...") subprocess.run( - ["condor_submit", str(condor)], + ["condor_submit_dag", "-f", str(dag_file)], cwd=str(folder.absolute()), preexec_fn=self._switch_to_submituser, ) @@ -483,7 +492,7 @@ class WorkflowService(WorkflowServiceIF): os.setuid(submituser_uid) @staticmethod - def _get_logfile_name(jobfile_name: Path) -> Path: + def _get_job_logfile_name(jobfile_name: Path) -> Path: """ Read HTCondor job file and get the log file name, if it exists @@ -497,6 +506,16 @@ class WorkflowService(WorkflowServiceIF): return Path(logfile_name.strip()) return Path(f"{jobfile_name.stem}.log") + @staticmethod + def _get_dag_logfile_name(dag_file: Path) -> Path: + """ + Return path to DAG log file, which will always be {dag-file-name}.dagman.log + + :param dag_file: Path to workflow DAG file + :return: Path to workflow DAG log + """ + return Path(str(dag_file) + ".dagman.log") + @staticmethod def _get_forbidden_templates_list(workflow_name: str): return [ @@ -569,9 +588,9 @@ class WorkflowMessageHandler: logger.info("SENDING INGESTION COMPLETE MESSAGE!") subject["execution_wf_id"] = wf_request.argument["parent_wf_request_id"] - ingestion_complete_msg = WorkflowMessageArchitect( - previous_info=subject - ).compose_message("ingestion_complete") + ingestion_complete_msg = WorkflowMessageArchitect(previous_info=subject).compose_message( + "ingestion_complete" + ) self.messenger.send_message(**ingestion_complete_msg) @on_message(service="workflow") @@ -594,9 +613,7 @@ class WorkflowMessageHandler: if htcondor_job_id := int(message["condor_metadata"]["condor_job_id"]): # Workflow has corresponding condor job ID - logger.info( - f"Workflow request has an HTCondor job ID of {htcondor_job_id}. Setting DB column!" - ) + logger.info(f"Workflow request has an HTCondor job ID of {htcondor_job_id}. Setting DB column!") request.htcondor_job_id = htcondor_job_id elif message["type"] == "workflow-complete": status = WorkflowRequestState.Complete.name @@ -631,8 +648,7 @@ class WorkflowMessageHandler: except Exception as exc: transaction.abort() logger.error( - f"Failed to update status on workflow request " - f"{request.workflow_request_id} to {status}: {exc}" + f"Failed to update status on workflow request " f"{request.workflow_request_id} to {status}: {exc}" ) else: logger.warning(f"Message {message} does not concern a workflow request. Ignoring.") @@ -642,24 +658,19 @@ class WorkflowMessageHandler: wf_id = subject["workflow_request_id"] wf_request = self.info.lookup_workflow_request(wf_id) - if ( - wf_request.workflow_name == ArchiveWorkflows.CARTA.value - and wf_request.argument["notify_ready"] is False - ): - logger.info( - f"SENDING FAILED CARTA MESSAGE to AAT Request Handler for request #{wf_id}!" - ) + if wf_request.workflow_name == ArchiveWorkflows.CARTA.value and wf_request.argument["notify_ready"] is False: + logger.info(f"SENDING FAILED CARTA MESSAGE to AAT Request Handler for request #{wf_id}!") routing_key = f"ws-workflow.carta-instance-ready.{wf_id}" - carta_url_msg = ArchiveMessageArchitect( - routing_key=routing_key, request=wf_request - ).compose_message("carta_failed") + carta_url_msg = ArchiveMessageArchitect(routing_key=routing_key, request=wf_request).compose_message( + "carta_failed" + ) self.archive_messenger.send_message(**carta_url_msg) if wf_request.workflow_name == ArchiveWorkflows.SECI.value: logger.info(f"SENDING FAILED SECI MESSAGE to VLASS Manager for request #{wf_id}!") routing_key = f"ws-workflow.seci.{wf_id}" - seci_msg = ArchiveMessageArchitect( - routing_key=routing_key, request=wf_request - ).compose_message("seci_failed") + seci_msg = ArchiveMessageArchitect(routing_key=routing_key, request=wf_request).compose_message( + "seci_failed" + ) self.archive_messenger.send_message(**seci_msg) def send_archive_complete_event(self, **message: Dict): @@ -670,9 +681,9 @@ class WorkflowMessageHandler: if wf_request.workflow_name == ArchiveWorkflows.SECI.value: logger.info(f"SENDING SECI COMPLETE MESSAGE to VLASS Manager for request #{wf_id}!") routing_key = f"ws-workflow.seci.{wf_id}" - seci_msg = ArchiveMessageArchitect( - routing_key=routing_key, request=wf_request - ).compose_message("seci_complete") + seci_msg = ArchiveMessageArchitect(routing_key=routing_key, request=wf_request).compose_message( + "seci_complete" + ) self.archive_messenger.send_message(**seci_msg) def clean_remote_workflow(self, request: WorkflowRequestIF): -- GitLab