Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • ssa/workspaces
1 result
Show changes
......@@ -2,9 +2,9 @@ from __future__ import annotations
from typing import Dict, List
from workspaces.system.schema import JSONSerializable
from workspaces.capability.helpers_interfaces import CapabilityStepIF, ParameterIF
from workspaces.products.schema_interfaces import FutureProductIF
from workspaces.system.schema import JSONSerializable
from workspaces.workflow.schema_interfaces import WorkflowRequestIF
......@@ -48,3 +48,6 @@ class CapabilityExecutionIF:
def on_last_step(self) -> bool:
raise NotImplementedError
def __json__(self):
raise NotImplementedError
......@@ -66,7 +66,9 @@ class ExecutionManager:
@on_message(service="workflow", type="workflow-complete")
def send_step_complete(self, **message: Dict):
subject = message["subject"]
execution = self.capability_info.lookup_execution_by_workflow_request_id(subject["workflow_request_id"])
execution = self.capability_info.lookup_execution_by_workflow_request_id(
subject["workflow_request_id"]
)
step_complete_msg = {
"service": "capability",
"routing_key": "capability",
......@@ -88,14 +90,13 @@ class ExecutionManager:
execution_id = subject["id"]
execution = self.capability_info.lookup_execution(execution_id)
else:
logger.error("Unrecognized message type.")
logger.error(f"Unrecognized message type: {subject['type']}.")
return
step_sequence = CapabilitySequence.from_str(execution.steps)
current_step = step_sequence[execution.current_step]
# Check if previous step (that just completed) is run workflow step
if current_step.step_type == CapabilityStepType.AwaitWorkflow:
# Return capability engine to available state
# Workflow completed; return capability engine to available state
self.queues[execution.capability.name].complete_execution(execution_id)
logger.info(">>> WORKFLOW COMPLETE! SETTING EXECUTION TO WAITING")
execution.state = ExecutionState.Waiting.name
......@@ -117,8 +118,8 @@ class ExecutionManager:
:return: Execution with updated metadata
"""
execution.current_step += 1
current_step = step_sequence[execution.current_step]
current_step = step_sequence[execution.current_step]
if current_step.step_type == CapabilityStepType.PrepareAndRunWorkflow:
# Enqueue execution that is on a run workflow step
# FIXME: Priority needs to be dynamic; perhaps a priority column in execution schema?
......
......@@ -5,6 +5,7 @@
-e ../packages/shared/messaging
-e ../packages/shared/workspaces
-e ../packages/apps/cli/utilities/wf_monitor
-e ../packages/apps/cli/utilities/ws_metrics
-e ../packages/apps/cli/executables/datafetcher
-e ../packages/apps/cli/executables/delivery
-e ../packages/apps/cli/executables/null
......
......@@ -12,24 +12,21 @@ pytest_plugins = ["testing.utils.conftest"]
3. If you want a non-fixture item from here, you'll have to do a normal Python import
"""
import json
from typing import List
from datetime import datetime
from typing import Any, Callable, List, Union
from unittest import mock
from unittest.mock import patch, Mock
from unittest.mock import patch
import pendulum
import pytest
import workspaces
from workspaces.workflow.schema import WorkflowRequest, Workflow, WorkflowRequestFile
from messaging.messenger import Messenger
from messaging.router import Router
from mock_alchemy.mocking import UnifiedAlchemyMagicMock
from workspaces.capability.schema import (
Capability,
CapabilityExecution,
CapabilityRequest,
CapabilityVersion,
)
from workspaces.capability.schema_interfaces import (
CapabilityExecutionIF,
......@@ -38,7 +35,13 @@ from workspaces.capability.schema_interfaces import (
from workspaces.capability.services.capability_engine import CapabilityEngine
from workspaces.capability.services.capability_info import CapabilityInfo
from workspaces.capability.services.capability_service import CapabilityService
from workspaces.capability.services.interfaces import CapabilityEngineIF
from workspaces.capability.services.interfaces import (
CapabilityEngineIF,
CapabilityInfoIF,
)
from workspaces.workflow.schema import Workflow, WorkflowRequest
from workspaces.workflow.schema_interfaces import WorkflowRequestIF
from workspaces.workflow.services.interfaces import WorkflowInfoIF
from workspaces.workflow.services.workflow_info import WorkflowInfo
from workspaces.workflow.services.workflow_service import WorkflowService
......@@ -48,7 +51,6 @@ def mock_capability_info() -> CapabilityInfo:
"""
Pytest fixture providing a mock CapabilityInfo object with a test capability; mocked object can be used
like a normal CapabilityInfo instance
TODO: Use UnifiedAlchemyMagicMock's data parameter to add test capabilities
:return: Mocked CapabilityInfo
"""
......@@ -66,25 +68,31 @@ def mock_capability_info() -> CapabilityInfo:
[mock.call.query(Capability), mock.call.filter_by(name="test")],
[test_capability],
)
null_capability_request = CapabilityRequest(
id=1,
state="Ready",
capability=null_capability,
parameters="-g",
# a trick here is to ensure that we always have a first version, with the original parameters
versions=[CapabilityVersion(version_number=1, parameters="-g")],
)
null_capability_request_query = (
[mock.call.query(CapabilityRequest), mock.call.filter_by(id=1)],
[null_capability_request],
)
mocked_capability_info = CapabilityInfo(
UnifiedAlchemyMagicMock(
data=[null_capability_query, test_capability_query, null_capability_request_query],
data=[
null_capability_query,
test_capability_query,
],
)
)
return mocked_capability_info
with patch("workspaces.capability.services.capability_info.CapabilityInfo.save_entity"):
yield mocked_capability_info
def add_data_to_db_mock(db_mock: Union[CapabilityInfoIF, WorkflowInfoIF], data: Any):
"""
Add a query that returns data to a mocked database access object
:param db_mock: Mocked data access object (either CapabilityInfo or WorkflowInfo)
:param query: Query that should return the given data
EX: session.query(Capability).filter_by(name=capability_name).first() turns into
[mock.call.query(Capability), mock.call.filter_by(name=capability_name), mock.call.first()]
:param data: List of data to return from query
"""
db_mock.session.add(data)
@pytest.fixture(scope="session")
......@@ -116,26 +124,13 @@ def mock_workflow_info() -> WorkflowInfo:
null_workflow_query = (
[mock.call.query(Workflow), mock.call.filter_by(name="null")],
[null_workflow]
[null_workflow],
)
null_request = WorkflowRequest(
workflow_request_id=1,
workflow_name=null_workflow.workflow_name,
argument="-n",
state="Created",
results_dir="",
created_at=pendulum.now(),
updated_at=pendulum.now()
)
null_request_query = (
[mock.call.query(WorkflowRequest), mock.call.filter_by(id=1)],
[null_request]
)
mock_workflow_info = WorkflowInfo(UnifiedAlchemyMagicMock(data=[null_workflow_query, null_request_query]))
return mock_workflow_info
mock_session = UnifiedAlchemyMagicMock(data=[null_workflow_query])
mock_workflow_info = WorkflowInfo(mock_session)
with patch("workspaces.workflow.services.workflow_info.WorkflowInfo.save_request"):
yield mock_workflow_info
@pytest.fixture(scope="session")
......@@ -150,42 +145,79 @@ def mock_workflow_service(mock_workflow_info: WorkflowInfo) -> WorkflowService:
@pytest.fixture(scope="session")
def mock_capability_request(
def mock_capability_requests(
mock_capability_service: CapabilityService,
mock_capability_info: CapabilityInfoIF,
) -> List[CapabilityRequestIF]:
"""
Pytest fixture adding two mock CapabilityRequest objects that can be accessed from the mock_capability_info
:param mock_capability_service: Mocked CapabilityService
:param mock_capability_info: Mocked CapabilityInfo
:return: Mocked CapabilityRequest
"""
requests = []
r1 = mock_capability_service.create_request("null", ["-g"])
r1.id = 0
r1.id = -1
r1.created_at = datetime(1996, 9, 10)
r1.updated_at = datetime(1996, 9, 10)
requests.append(r1)
add_data_to_db_mock(mock_capability_info, r1)
r2 = mock_capability_service.create_request("test")
r2.id = 1
r2.id = -2
r2.created_at = datetime(1996, 9, 10)
r2.updated_at = datetime(1996, 9, 10)
requests.append(r2)
add_data_to_db_mock(mock_capability_info, r2)
return requests
@pytest.fixture(scope="session")
def mock_workflow_requests(
mock_workflow_service: WorkflowService, mock_workflow_info: WorkflowInfoIF
) -> List[WorkflowRequestIF]:
requests = []
r1 = WorkflowRequest(
workflow_request_id=-1,
workflow_name="null",
argument={"parameters": "-g"},
state="Ready",
created_at=datetime(1996, 9, 10),
updated_at=datetime(1996, 9, 10),
)
requests.append(r1)
add_data_to_db_mock(mock_workflow_info, r1)
return requests
@pytest.fixture(scope="session")
def mock_capability_execution(
mock_capability_info: CapabilityInfo, mock_capability_request: CapabilityRequest
mock_capability_info: CapabilityInfo, mock_capability_requests
) -> CapabilityExecutionIF:
"""
Pytest fixture adding a mock CapabilityExecution object that can be
accessed from mock_capability_info
:param mock_capability_request: Mocked CapabilityRequest
:param mock_capability_requests: Mocked CapabilityRequest
:param mock_capability_info: Mocked CapabilityInfo
:return: Mocked CapabilityExecution
"""
request = mock_capability_request[0]
request = mock_capability_requests[0]
execution = mock_capability_info.create_execution(request)
execution.id = 0
execution.current_workflow_request_id = -1
add_data_to_db_mock(mock_capability_info, execution)
execution = mock_capability_info.create_execution(
mock_capability_info.lookup_capability_request(request)
)
return execution
......@@ -193,6 +225,7 @@ def mock_capability_execution(
def mock_capability_engine() -> CapabilityEngineIF:
"""
Pytest fixture adding a mock CapabilityEngine object
:return: mocked engine
"""
......