Skip to content
Snippets Groups Projects
Commit ff287fb3 authored by Nathan Hertz's avatar Nathan Hertz
Browse files

Executions are now attempted to be aborted on QaPass

parent 3392e392
No related branches found
No related tags found
No related merge requests found
Pipeline #3652 canceled
......@@ -293,6 +293,19 @@ class WorkflowRequestRestService:
# 4. submit ingestion workflow request
self.request.workflows.execute(ingest_request)
@view_config(request_method="POST", route_name="abort_workflow_request")
def abort(self):
"""
Given the ID of a workflow request, abort the execution of the request if it is running
"""
request_id = self.request.matchdict["request_id"]
self.request.workflows.abort_running_workflow(request_id)
return Response(
status_code=http.HTTPStatus.OK,
body=f"SUCCESS: Sent signal to abort running workflow request #{request_id}.",
)
@view_config(request_method="POST", route_name="send_carta_url_to_aat")
def send_carta_url_to_aat(self):
"""
......@@ -572,6 +585,12 @@ def main(global_config, **settings):
"/workflows/{name}/requests/{request_id}/ingest",
factory=lookup_request,
)
# Use this route to abort a running workflow
config.add_route(
"abort_workflow_request",
"/workflows/requests/{request_id}/abort",
factory=lookup_request,
)
# Use this route to send a Do Not Calibrate message to the AAT system
config.add_route(
......
......@@ -40,7 +40,6 @@ from workspaces.capability.schema import (
QaPass,
)
from workspaces.capability.services.capability_info import CapabilityInfo
from workspaces.capability.services.execution_manager import ExecutionManager
from .conftest import (
SAMPLE_CAPABILITY_NAMES,
......@@ -51,6 +50,8 @@ from .conftest import (
pytest_plugins = ["testing.utils.conftest"]
FAKE_WORKFLOW_URL = "http://fake-workflow.edu"
# Register Capability JSON blueprint as a hypothesis type strategy
# To use:
......@@ -159,11 +160,10 @@ st.register_type_strategy(
@given(
st.one_of(st.lists(st.text()), st.none()),
st.from_type(Capability),
st.integers(min_value=1, max_value=10),
st.integers(min_value=2, max_value=10),
)
def test_qa_pass(
mock_capability_info: CapabilityInfo,
dummy_execution_manager: ExecutionManager,
parameters: Optional[List[str]],
capability_json: Dict,
number_of_versions: int,
......@@ -182,12 +182,26 @@ def test_qa_pass(
# Latest version is always the one to pass
version_to_pass = expected_versions[-1]
passing_execution = mock_capability_info.create_execution(expected_request)
passing_execution.version = version_to_pass
passing_execution.version_number = version_to_pass.version_number
passing_execution.state_name = "Awaiting QA"
version_to_fail = expected_versions[0]
failing_execution = mock_capability_info.create_execution(expected_request)
failing_execution.version = version_to_fail
failing_execution.version_number = version_to_fail.version_number
failing_execution.state_name = "Executing"
failing_execution.current_workflow_request_id = -1
assert len(expected_versions) == number_of_versions
QaPass()(passing_execution, dummy_execution_manager)
with patch("workspaces.capability.schema.requests.post") as mock_post:
with patch("workspaces.capability.schema.CapoConfig") as mock_capo_config:
mock_capo_config.return_value.settings.return_value.serviceUrl = FAKE_WORKFLOW_URL
QaPass()(passing_execution)
# Assert failing execution has its execution aborted
mock_post.assert_called_with(
f"{FAKE_WORKFLOW_URL}/workflows/requests/{failing_execution.current_workflow_request_id}/abort"
)
# Assert all versions except passing one are failed and passing one is complete
for expected_version in expected_versions:
......@@ -201,12 +215,11 @@ def test_qa_pass(
@given(st.from_type(CapabilityExecution))
def test_announce_qa(execution_json: Dict):
execution = CapabilityExecution.from_json(execution_json)
fake_workflow_url = "http://fake-workflow.edu"
with patch("workspaces.capability.schema.requests.post") as mock_post:
with patch("workspaces.capability.schema.CapoConfig") as mock_capo_config:
mock_capo_config.return_value.settings.return_value.serviceUrl = fake_workflow_url
mock_capo_config.return_value.settings.return_value.serviceUrl = FAKE_WORKFLOW_URL
AnnounceQa()(execution)
mock_post.assert_called_with(
f"{fake_workflow_url}/workflows/requests/{execution.current_workflow_request_id}/qa/qa_ready"
f"{FAKE_WORKFLOW_URL}/workflows/requests/{execution.current_workflow_request_id}/qa/qa_ready"
)
......@@ -23,13 +23,16 @@ import os
import pathlib
import shutil
import sys
from typing import List
from typing import Dict, List
# pylint: disable=E0401, R0201, W0212
from unittest.mock import MagicMock, patch
import pytest
from hypothesis import given
from hypothesis import strategies as st
from workspaces.workflow.enum import WorkflowRequestState
from workspaces.workflow.schema import Workflow, WorkflowRequest, WorkflowRequestFile
from workspaces.workflow.services.workflow_info import WorkflowInfo
from workspaces.workflow.services.workflow_service import WorkflowService
......@@ -45,6 +48,33 @@ Tests for WorkflowService
"""
# Register WorkflowRequest JSON blueprint as a hypothesis type strategy
# To use:
# >>> @given(st.from_type(WorkflowRequest))
# >>> def test(generated_request_json: Dict):
# >>> request = WorkflowRequest.from_json(generated_request_json)
st.register_type_strategy(
WorkflowRequest,
st.fixed_dictionaries(
{
"type": st.just("WorkflowRequest"),
"workflow_request_id": st.integers(min_value=1, max_value=100),
"workflow_name": st.sampled_from(SAMPLE_WORKFLOW_NAMES),
"argument": st.one_of(st.lists(st.text()), st.none()),
"state": st.sampled_from([name for name, _ in WorkflowRequestState.__members__.items()]),
"results_dir": st.text(min_size=10),
"files": st.none(),
"created_at": st.datetimes().map(
lambda time: time.isoformat(),
),
"updated_at": st.datetimes().map(
lambda time: time.isoformat(),
),
}
),
)
files = [
WorkflowRequestFile(
workflow_request_id=-1,
......@@ -105,7 +135,7 @@ class TestWorkflowService:
mock_workflow_requests: List[WorkflowRequest],
):
"""
Is a file attached to workflow requests as expected?
Is a file attached tto workflow requests as expected?
:param mock_workflow_info:
:param mock_workflow_requests:
......@@ -260,3 +290,12 @@ class TestWorkflowService:
def test_get_dagfile_log(self, mock_workflow_service: WorkflowService):
log_path = mock_workflow_service._get_dag_logfile_name(pathlib.Path("fake.dag"))
assert log_path.name == "fake.dag.dagman.log"
@given(st.from_type(WorkflowRequest))
def test_abort_running_workflow(self, mock_workflow_service: WorkflowService, workflow_request_json: Dict):
workflow_request = WorkflowRequest.from_json(workflow_request_json)
raise NotImplementedError
# with clear_test_database():
# mock_workflow_service.abort_running_workflow(workflow_request.workflow_request_id)
......@@ -169,18 +169,27 @@ class QaPass(Action):
for version in versions:
if version.__json__() == request.current_version.__json__():
# Mark current version as complete
print(f"Passing version: {version.__json__()}")
version.state = CapabilityVersionState.Complete.name
else:
# Mark all other versions as failed
print(f"Failing version: {version.__json__()}")
version.state = CapabilityVersionState.Failed.name
# Abort executions of version
self._abort_running_executions(version)
def _abort_running_executions(self, request: CapabilityRequest):
def _abort_running_executions(self, version: CapabilityVersion):
"""
Given a capability request, find all running executions associated with it and abort them
Given a capability version, fail all running executions
:param request: Capability request
:param version: Capability version
"""
raise NotImplementedError
workflow_service_url = CapoConfig().settings("edu.nrao.workspaces.WorkflowSettings").serviceUrl
for execution in version.executions:
workflow_request_id = execution.current_workflow_request_id
logger.info(f"Attempting to abort running execution #{execution.id}.")
requests.post(f"{workflow_service_url}/workflows/requests/{workflow_request_id}/abort")
def __call__(self, execution: CapabilityExecutionIF, *args):
logger.info("Execution #%s has passed QA", execution.id)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment