From 357cca10614460cbd3e61a6f43c66ef1aaf6356e Mon Sep 17 00:00:00 2001 From: nhertz <nhertz@nrao.edu> Date: Fri, 12 Nov 2021 12:17:55 -0700 Subject: [PATCH] Re-commit: Executions of failed versions are now attempted to be aborted --- services/workflow/workflow/server.py | 19 +++++++++++++ .../test/test_capability_actions.py | 27 ++++++++++++++----- .../workspaces/capability/schema.py | 17 +++++++++--- .../workflow/services/workflow_service.py | 16 +++++++++++ 4 files changed, 68 insertions(+), 11 deletions(-) diff --git a/services/workflow/workflow/server.py b/services/workflow/workflow/server.py index 175c3955f..158dacdc5 100644 --- a/services/workflow/workflow/server.py +++ b/services/workflow/workflow/server.py @@ -293,6 +293,19 @@ class WorkflowRequestRestService: # 4. submit ingestion workflow request self.request.workflows.execute(ingest_request) + @view_config(request_method="POST", route_name="abort_workflow_request") + def abort(self): + """ + Given the ID of a workflow request, abort the execution of the request if it is running + """ + request_id = self.request.matchdict["request_id"] + self.request.workflows.abort_running_workflow(request_id) + + return Response( + status_code=http.HTTPStatus.OK, + body=f"SUCCESS: Sent signal to abort running workflow request #{request_id}.", + ) + @view_config(request_method="POST", route_name="qa_fail") def qa_fail(self): """ @@ -596,6 +609,12 @@ def main(global_config, **settings): "/workflows/{name}/requests/{request_id}/ingest", factory=lookup_request, ) + # Use this route to abort a running workflow + config.add_route( + "abort_workflow_request", + "/workflows/requests/{request_id}/abort", + factory=lookup_request, + ) # Use this route to send a Do Not Calibrate message to the AAT system config.add_route( diff --git a/shared/workspaces/test/test_capability_actions.py b/shared/workspaces/test/test_capability_actions.py index 81a209a0b..b83d871e3 100644 --- a/shared/workspaces/test/test_capability_actions.py +++ b/shared/workspaces/test/test_capability_actions.py @@ -40,7 +40,6 @@ from workspaces.capability.schema import ( QaPass, ) from workspaces.capability.services.capability_info import CapabilityInfo -from workspaces.capability.services.execution_manager import ExecutionManager from .conftest import ( SAMPLE_CAPABILITY_NAMES, @@ -51,6 +50,8 @@ from .conftest import ( pytest_plugins = ["testing.utils.conftest"] +FAKE_WORKFLOW_URL = "http://fake-workflow.edu" + # Register Capability JSON blueprint as a hypothesis type strategy # To use: @@ -159,11 +160,10 @@ st.register_type_strategy( @given( st.one_of(st.lists(st.text()), st.none()), st.from_type(Capability), - st.integers(min_value=1, max_value=10), + st.integers(min_value=2, max_value=10), ) def test_qa_pass( mock_capability_info: CapabilityInfo, - dummy_execution_manager: ExecutionManager, parameters: Optional[List[str]], capability_json: Dict, number_of_versions: int, @@ -182,12 +182,26 @@ def test_qa_pass( # Latest version is always the one to pass version_to_pass = expected_versions[-1] passing_execution = mock_capability_info.create_execution(expected_request) + passing_execution.version = version_to_pass passing_execution.version_number = version_to_pass.version_number passing_execution.state_name = "Awaiting QA" + version_to_fail = expected_versions[0] + failing_execution = mock_capability_info.create_execution(expected_request) + failing_execution.version = version_to_fail + failing_execution.version_number = version_to_fail.version_number + failing_execution.state_name = "Executing" + failing_execution.current_workflow_request_id = -1 assert len(expected_versions) == number_of_versions - QaPass()(passing_execution, dummy_execution_manager) + with patch("workspaces.capability.schema.requests.post") as mock_post: + with patch("workspaces.capability.schema.CapoConfig") as mock_capo_config: + mock_capo_config.return_value.settings.return_value.serviceUrl = FAKE_WORKFLOW_URL + QaPass()(passing_execution) + # Assert failing execution has its execution aborted + mock_post.assert_called_with( + f"{FAKE_WORKFLOW_URL}/workflows/requests/{failing_execution.current_workflow_request_id}/abort" + ) # Assert all versions except passing one are failed and passing one is complete for expected_version in expected_versions: @@ -201,12 +215,11 @@ def test_qa_pass( @given(st.from_type(CapabilityExecution)) def test_announce_qa(execution_json: Dict): execution = CapabilityExecution.from_json(execution_json) - fake_workflow_url = "http://fake-workflow.edu" with patch("workspaces.capability.schema.requests.post") as mock_post: with patch("workspaces.capability.schema.CapoConfig") as mock_capo_config: - mock_capo_config.return_value.settings.return_value.serviceUrl = fake_workflow_url + mock_capo_config.return_value.settings.return_value.serviceUrl = FAKE_WORKFLOW_URL AnnounceQa()(execution) mock_post.assert_called_with( - f"{fake_workflow_url}/workflows/requests/{execution.current_workflow_request_id}/qa/qa_ready" + f"{FAKE_WORKFLOW_URL}/workflows/requests/{execution.current_workflow_request_id}/qa/qa_ready" ) diff --git a/shared/workspaces/workspaces/capability/schema.py b/shared/workspaces/workspaces/capability/schema.py index c714e6efe..90f9f9af2 100644 --- a/shared/workspaces/workspaces/capability/schema.py +++ b/shared/workspaces/workspaces/capability/schema.py @@ -170,18 +170,27 @@ class QaPass(Action): for version in versions: if version.__json__() == request.current_version.__json__(): # Mark current version as complete + print(f"Passing version: {version.__json__()}") version.state = CapabilityVersionState.Complete.name else: # Mark all other versions as failed + print(f"Failing version: {version.__json__()}") version.state = CapabilityVersionState.Failed.name + # Abort executions of version + self._abort_running_executions(version) - def _abort_running_executions(self, request: CapabilityRequest): + def _abort_running_executions(self, version: CapabilityVersion): """ - Given a capability request, find all running executions associated with it and abort them + Given a capability version, fail all running executions - :param request: Capability request + :param version: Capability version """ - raise NotImplementedError + workflow_service_url = CapoConfig().settings("edu.nrao.workspaces.WorkflowSettings").serviceUrl + + for execution in version.executions: + workflow_request_id = execution.current_workflow_request_id + logger.info(f"Attempting to abort running execution #{execution.id}.") + requests.post(f"{workflow_service_url}/workflows/requests/{workflow_request_id}/abort") def __call__(self, execution: CapabilityExecutionIF, *args): logger.info("Execution #%s has passed QA", execution.id) diff --git a/shared/workspaces/workspaces/workflow/services/workflow_service.py b/shared/workspaces/workspaces/workflow/services/workflow_service.py index d44f09239..d424d7ec4 100644 --- a/shared/workspaces/workspaces/workflow/services/workflow_service.py +++ b/shared/workspaces/workspaces/workflow/services/workflow_service.py @@ -306,6 +306,22 @@ class WorkflowService(WorkflowServiceIF): return request + def abort_running_workflow(self, request_id: int): + """ + Given the ID for a running workflow request, attempt to abort the request's execution (using condor_rm) + + :param request_id: ID of running request + """ + workflow_request = self.info.lookup_workflow_request(request_id) + htcondor_job_id = workflow_request.htcondor_job_id + if htcondor_job_id: + subprocess.run(["condor_rm", f"{htcondor_job_id!s}"]) + else: + logger.warning( + f"Workflow request #{request_id} does not have an associated HTCondor job ID. " + f"No execution was aborted." + ) + def _make_temp_directory(self, request: WorkflowRequest) -> Path: """ Create tmp folder for workflow -- GitLab