From ff287fb3f31ee9ce83e80a18d10ba6902769a1fe Mon Sep 17 00:00:00 2001 From: nhertz <nhertz@nrao.edu> Date: Fri, 12 Nov 2021 12:05:27 -0700 Subject: [PATCH] Executions are now attempted to be aborted on QaPass --- services/workflow/workflow/server.py | 19 ++++++++ .../test/test_capability_actions.py | 27 +++++++++--- .../workspaces/test/test_workflow_service.py | 43 ++++++++++++++++++- .../workspaces/capability/schema.py | 17 ++++++-- 4 files changed, 93 insertions(+), 13 deletions(-) diff --git a/services/workflow/workflow/server.py b/services/workflow/workflow/server.py index c80f0ac4e..c805304a2 100644 --- a/services/workflow/workflow/server.py +++ b/services/workflow/workflow/server.py @@ -293,6 +293,19 @@ class WorkflowRequestRestService: # 4. submit ingestion workflow request self.request.workflows.execute(ingest_request) + @view_config(request_method="POST", route_name="abort_workflow_request") + def abort(self): + """ + Given the ID of a workflow request, abort the execution of the request if it is running + """ + request_id = self.request.matchdict["request_id"] + self.request.workflows.abort_running_workflow(request_id) + + return Response( + status_code=http.HTTPStatus.OK, + body=f"SUCCESS: Sent signal to abort running workflow request #{request_id}.", + ) + @view_config(request_method="POST", route_name="send_carta_url_to_aat") def send_carta_url_to_aat(self): """ @@ -572,6 +585,12 @@ def main(global_config, **settings): "/workflows/{name}/requests/{request_id}/ingest", factory=lookup_request, ) + # Use this route to abort a running workflow + config.add_route( + "abort_workflow_request", + "/workflows/requests/{request_id}/abort", + factory=lookup_request, + ) # Use this route to send a Do Not Calibrate message to the AAT system config.add_route( diff --git a/shared/workspaces/test/test_capability_actions.py b/shared/workspaces/test/test_capability_actions.py index 81a209a0b..b83d871e3 100644 --- a/shared/workspaces/test/test_capability_actions.py +++ b/shared/workspaces/test/test_capability_actions.py @@ -40,7 +40,6 @@ from workspaces.capability.schema import ( QaPass, ) from workspaces.capability.services.capability_info import CapabilityInfo -from workspaces.capability.services.execution_manager import ExecutionManager from .conftest import ( SAMPLE_CAPABILITY_NAMES, @@ -51,6 +50,8 @@ from .conftest import ( pytest_plugins = ["testing.utils.conftest"] +FAKE_WORKFLOW_URL = "http://fake-workflow.edu" + # Register Capability JSON blueprint as a hypothesis type strategy # To use: @@ -159,11 +160,10 @@ st.register_type_strategy( @given( st.one_of(st.lists(st.text()), st.none()), st.from_type(Capability), - st.integers(min_value=1, max_value=10), + st.integers(min_value=2, max_value=10), ) def test_qa_pass( mock_capability_info: CapabilityInfo, - dummy_execution_manager: ExecutionManager, parameters: Optional[List[str]], capability_json: Dict, number_of_versions: int, @@ -182,12 +182,26 @@ def test_qa_pass( # Latest version is always the one to pass version_to_pass = expected_versions[-1] passing_execution = mock_capability_info.create_execution(expected_request) + passing_execution.version = version_to_pass passing_execution.version_number = version_to_pass.version_number passing_execution.state_name = "Awaiting QA" + version_to_fail = expected_versions[0] + failing_execution = mock_capability_info.create_execution(expected_request) + failing_execution.version = version_to_fail + failing_execution.version_number = version_to_fail.version_number + failing_execution.state_name = "Executing" + failing_execution.current_workflow_request_id = -1 assert len(expected_versions) == number_of_versions - QaPass()(passing_execution, dummy_execution_manager) + with patch("workspaces.capability.schema.requests.post") as mock_post: + with patch("workspaces.capability.schema.CapoConfig") as mock_capo_config: + mock_capo_config.return_value.settings.return_value.serviceUrl = FAKE_WORKFLOW_URL + QaPass()(passing_execution) + # Assert failing execution has its execution aborted + mock_post.assert_called_with( + f"{FAKE_WORKFLOW_URL}/workflows/requests/{failing_execution.current_workflow_request_id}/abort" + ) # Assert all versions except passing one are failed and passing one is complete for expected_version in expected_versions: @@ -201,12 +215,11 @@ def test_qa_pass( @given(st.from_type(CapabilityExecution)) def test_announce_qa(execution_json: Dict): execution = CapabilityExecution.from_json(execution_json) - fake_workflow_url = "http://fake-workflow.edu" with patch("workspaces.capability.schema.requests.post") as mock_post: with patch("workspaces.capability.schema.CapoConfig") as mock_capo_config: - mock_capo_config.return_value.settings.return_value.serviceUrl = fake_workflow_url + mock_capo_config.return_value.settings.return_value.serviceUrl = FAKE_WORKFLOW_URL AnnounceQa()(execution) mock_post.assert_called_with( - f"{fake_workflow_url}/workflows/requests/{execution.current_workflow_request_id}/qa/qa_ready" + f"{FAKE_WORKFLOW_URL}/workflows/requests/{execution.current_workflow_request_id}/qa/qa_ready" ) diff --git a/shared/workspaces/test/test_workflow_service.py b/shared/workspaces/test/test_workflow_service.py index c03ef8c3c..831a4a7a0 100644 --- a/shared/workspaces/test/test_workflow_service.py +++ b/shared/workspaces/test/test_workflow_service.py @@ -23,13 +23,16 @@ import os import pathlib import shutil import sys -from typing import List +from typing import Dict, List # pylint: disable=E0401, R0201, W0212 from unittest.mock import MagicMock, patch import pytest +from hypothesis import given +from hypothesis import strategies as st +from workspaces.workflow.enum import WorkflowRequestState from workspaces.workflow.schema import Workflow, WorkflowRequest, WorkflowRequestFile from workspaces.workflow.services.workflow_info import WorkflowInfo from workspaces.workflow.services.workflow_service import WorkflowService @@ -45,6 +48,33 @@ Tests for WorkflowService """ +# Register WorkflowRequest JSON blueprint as a hypothesis type strategy +# To use: +# >>> @given(st.from_type(WorkflowRequest)) +# >>> def test(generated_request_json: Dict): +# >>> request = WorkflowRequest.from_json(generated_request_json) +st.register_type_strategy( + WorkflowRequest, + st.fixed_dictionaries( + { + "type": st.just("WorkflowRequest"), + "workflow_request_id": st.integers(min_value=1, max_value=100), + "workflow_name": st.sampled_from(SAMPLE_WORKFLOW_NAMES), + "argument": st.one_of(st.lists(st.text()), st.none()), + "state": st.sampled_from([name for name, _ in WorkflowRequestState.__members__.items()]), + "results_dir": st.text(min_size=10), + "files": st.none(), + "created_at": st.datetimes().map( + lambda time: time.isoformat(), + ), + "updated_at": st.datetimes().map( + lambda time: time.isoformat(), + ), + } + ), +) + + files = [ WorkflowRequestFile( workflow_request_id=-1, @@ -105,7 +135,7 @@ class TestWorkflowService: mock_workflow_requests: List[WorkflowRequest], ): """ - Is a file attached to workflow requests as expected? + Is a file attached tto workflow requests as expected? :param mock_workflow_info: :param mock_workflow_requests: @@ -260,3 +290,12 @@ class TestWorkflowService: def test_get_dagfile_log(self, mock_workflow_service: WorkflowService): log_path = mock_workflow_service._get_dag_logfile_name(pathlib.Path("fake.dag")) assert log_path.name == "fake.dag.dagman.log" + + @given(st.from_type(WorkflowRequest)) + def test_abort_running_workflow(self, mock_workflow_service: WorkflowService, workflow_request_json: Dict): + workflow_request = WorkflowRequest.from_json(workflow_request_json) + + raise NotImplementedError + + # with clear_test_database(): + # mock_workflow_service.abort_running_workflow(workflow_request.workflow_request_id) diff --git a/shared/workspaces/workspaces/capability/schema.py b/shared/workspaces/workspaces/capability/schema.py index 106b6ac53..4e0020486 100644 --- a/shared/workspaces/workspaces/capability/schema.py +++ b/shared/workspaces/workspaces/capability/schema.py @@ -169,18 +169,27 @@ class QaPass(Action): for version in versions: if version.__json__() == request.current_version.__json__(): # Mark current version as complete + print(f"Passing version: {version.__json__()}") version.state = CapabilityVersionState.Complete.name else: # Mark all other versions as failed + print(f"Failing version: {version.__json__()}") version.state = CapabilityVersionState.Failed.name + # Abort executions of version + self._abort_running_executions(version) - def _abort_running_executions(self, request: CapabilityRequest): + def _abort_running_executions(self, version: CapabilityVersion): """ - Given a capability request, find all running executions associated with it and abort them + Given a capability version, fail all running executions - :param request: Capability request + :param version: Capability version """ - raise NotImplementedError + workflow_service_url = CapoConfig().settings("edu.nrao.workspaces.WorkflowSettings").serviceUrl + + for execution in version.executions: + workflow_request_id = execution.current_workflow_request_id + logger.info(f"Attempting to abort running execution #{execution.id}.") + requests.post(f"{workflow_service_url}/workflows/requests/{workflow_request_id}/abort") def __call__(self, execution: CapabilityExecutionIF, *args): logger.info("Execution #%s has passed QA", execution.id) -- GitLab