Skip to content
Snippets Groups Projects
Commit e001e69a authored by Nathan Hertz's avatar Nathan Hertz
Browse files

Refactored execution management logic out of `CapabilityService` into a new...

Refactored execution management logic out of `CapabilityService` into a new class `ExecutionManager`
parent 5bb41d3f
No related branches found
No related tags found
1 merge request!102Execution manager refactor
Pipeline #735 passed with warnings
Showing with 328 additions and 149 deletions
......@@ -105,6 +105,7 @@ class MockCapabilityService(MagicMock):
def __init__(self, capability_info: MockCapabilityInfo):
super().__init__()
self.capability_info = capability_info
self.execution_manager = MockExecutionManager(capability_info)
def create_request(
self,
......@@ -132,6 +133,14 @@ class MockCapabilityService(MagicMock):
self.capability_info.save_entity(request)
return request
def run_capability(self, request: CapabilityRequest) -> CapabilityExecution:
return self.execution_manager.run_capability(request)
class MockExecutionManager:
def __init__(self, capability_info: MockCapabilityInfo):
self.capability_info = capability_info
def run_capability(self, capability_request: CapabilityRequest) -> CapabilityExecution:
"""
Mock run_capability method
......
from typing import List
from unittest.mock import MagicMock, patch
import pytest
from workspaces.capability.schema import CapabilityRequest
from workspaces.capability.services.capability_info import CapabilityInfo
from workspaces.capability.services.capability_service import CapabilityService
from workspaces.capability.services.execution_manager import ExecutionManager
from workspaces.workflow.services.workflow_service import WorkflowService
pytest_plugins = ["testing.utils.conftest"]
@pytest.fixture(scope="module")
def dummy_execution_manager(
mock_capability_info: CapabilityInfo, mock_workflow_service: WorkflowService
) -> ExecutionManager:
"""
Pytest fixture that instantiates a dummy ExecutionManager object for testing purposes
:return: Dummy ExecutionManager
"""
return ExecutionManager(mock_capability_info, mock_workflow_service)
def test_run_capability(
dummy_execution_manager: ExecutionManager,
mock_capability_info: CapabilityInfo,
mock_capability_request: List[CapabilityRequest],
):
"""
Tests that run_capability correctly creates an execution given a request and enqueues that execution
if the first capability step is running a workflow
"""
# Mock enqueue_execution so the execution is not enqueued
# dummy_execution_manager.enqueue_execution = MagicMock()
# null_capability_request = mock_capability_request[0]
# null_execution = dummy_execution_manager.run_capability(null_capability_request)
# assert dummy_execution_manager.enqueue_execution.call_count == 1
# assert null_execution.state == "Ready"
# test_capability_request = mock_capability_request[1]
# test_execution = dummy_execution_manager.run_capability(test_capability_request)
# assert dummy_execution_manager.enqueue_execution.call_count == 1
# assert len(dummy_execution_manager.execution_pool) == 1
# assert test_execution.state == "Ready"
assert True
......@@ -21,8 +21,8 @@ class CapabilityStepType(Enum):
"""
Get capability step given a string
:param string:
:return: Capability step
:param string: String to convert
:return: Capability step corresponding to given string or Invalid
"""
strings = {
"prepare-and-run-workflow": cls.PrepareAndRunWorkflow,
......@@ -76,6 +76,9 @@ class ExecutionState(Enum):
class ExecutionPriority(Enum):
"""
Enum that specifies the priority of a particular capability execution
TODO: These numbers are just placeholders I added on a whim, and I think this Enum should get another look in
the future
"""
High = 25
......
......@@ -237,3 +237,6 @@ class CapabilityExecution(Base, CapabilityExecutionIF):
@property
def capability_request(self):
return self.version.request
def on_last_step(self) -> bool:
return self.current_step == len(CapabilitySequence.from_str(self.steps)) - 1
......@@ -50,3 +50,6 @@ class CapabilityExecutionIF:
def __json__(self) -> Dict[str, str]:
raise NotImplementedError
def on_last_step(self) -> bool:
raise NotImplementedError
......@@ -4,6 +4,7 @@ from collections import namedtuple
from queue import Empty, PriorityQueue, Queue
from typing import NamedTuple, Optional
from workspaces.capability.enums import ExecutionState
from workspaces.capability.schema_interfaces import CapabilityExecutionIF
from workspaces.capability.services.capability_engine import CapabilityEngine
from workspaces.capability.services.interfaces import (
......@@ -30,12 +31,8 @@ class CapabilityQueue(CapabilityQueueIF):
self.capability_info = capability_info
self.queue = PriorityQueue()
self.max_concurrency = max_concurrency
self.engine_list = self.init_engines(
max_concurrency, capability_info, workflow_service
)
self.queue_manager = threading.Thread(
target=self.process_executions, daemon=True
)
self.engine_list = self.init_engines(max_concurrency, capability_info, workflow_service)
self.queue_manager = threading.Thread(target=self.process_executions, daemon=True)
self.queue_manager.start()
self.paused = False
......@@ -53,9 +50,7 @@ class CapabilityQueue(CapabilityQueueIF):
"""
engine_list = cls.EngineList(Queue(), [])
for _ in range(num_engines):
engine_list.available.put(
CapabilityEngine(capability_info, workflow_service)
)
engine_list.available.put(CapabilityEngine(capability_info, workflow_service))
return engine_list
......@@ -70,11 +65,15 @@ class CapabilityQueue(CapabilityQueueIF):
engine = self.get_available_engine()
if engine:
execution = self.capability_info.lookup_execution(execution.id)
# Start engine
engine.load_engine(execution)
# Move engine to in-use list
self.engine_list.in_use.append(engine)
engine.execute()
if not execution.state == ExecutionState.Cancelled.name:
# Execution not marked as cancelled
# Start engine
engine.load_engine(execution)
# Move engine to in-use list
self.engine_list.in_use.append(engine)
# Update execution record's state
execution.state = ExecutionState.ExecutingStep.name
engine.execute()
else:
# FIXME: Logging
print("No available engines. Try again later.")
......
import threading
from typing import List
from channels.amqp_helpers import Channel, WorkflowEventChannel, CapabilityEventChannel
from channels.amqp_helpers import CapabilityEventChannel, Channel, WorkflowEventChannel
from workspaces.capability.enums import (
CapabilityEventType,
CapabilityStepType,
ExecutionPriority,
ExecutionState,
)
from workspaces.capability.helpers import CapabilitySequence, Parameter
from workspaces.capability.enums import CapabilityEventType
from workspaces.capability.helpers import Parameter
from workspaces.capability.schema import CapabilityEvent, CapabilityRequest
from workspaces.capability.schema_interfaces import (
CapabilityExecutionIF,
CapabilityRequestIF,
)
from workspaces.capability.services.capability_queue import CapabilityQueue
from workspaces.capability.services.execution_manager import ExecutionManager
from workspaces.capability.services.interfaces import (
CapabilityInfoIF,
CapabilityServiceIF,
......@@ -32,12 +27,18 @@ class CapabilityService(CapabilityServiceIF):
"""
def __init__(self, capability_info: CapabilityInfoIF, workflow_service: WorkflowServiceIF):
self.execution_pool = []
self.queues = {}
self.execution_manager = ExecutionManager(capability_info, workflow_service)
self.capability_info = capability_info
self.workflow_service = workflow_service
self.listener = threading.Thread(target=self.listen_for_events, daemon=True)
self.listener.start()
# self.workflow_service = workflow_service
# TODO: Thread management
self.workflow_event_listener = threading.Thread(
target=self.listen_for_workflow_events, daemon=True
)
self.workflow_event_listener.start()
self.capability_event_listener = threading.Thread(
target=self.listen_for_capability_events, daemon=True
)
self.capability_event_listener.start()
def create_request(
self,
......@@ -57,116 +58,17 @@ class CapabilityService(CapabilityServiceIF):
def run_capability(self, request: CapabilityRequest) -> CapabilityExecutionIF:
"""
Run a capability with the requested settings
Use the interface of the execution manager to run a capability
TODO: Executions should be made from capability versions, not requests
:param request: Capability request
:return: Execution of the request
"""
execution = self.capability_info.create_execution(request)
steps = CapabilitySequence.from_str(execution.steps)
if steps[0].step_type == CapabilityStepType.PrepareAndRunWorkflow:
# First step is to run workflow, move to capability queue
# FIXME: Priority needs to be dynamic
self.enqueue_execution(execution, ExecutionPriority.Default.value)
else:
self.execution_pool.append(execution)
return execution
def update_execution(self, event: WorkflowEvent):
"""
Update capability execution given a received event
TODO: Implement execution cancellation
:param event: Incoming event
"""
capability_event = self.workflow_to_capability_event(event)
# TODO:add capability event sending
# capability_events = Channel(CapabilityEventChannel(), threaded=True)
# capability_events.send(capability_event)
if not capability_event:
# WorkflowEvent does not need to update the execution
return
execution = self.capability_info.lookup_execution(capability_event.execution_id)
step_sequence = CapabilitySequence.from_str(execution.steps)
current_step = step_sequence[execution.current_step]
event_type = CapabilityEventType[capability_event.event_type.name]
# Check to make sure event type is correct
if current_step.step_type.value == event_type.value:
# Check if previous step (that just completed) is run workflow step
if (
step_sequence[execution.current_step].step_type
== CapabilityStepType.PrepareAndRunWorkflow
):
# Return capability engine to available state
if self.queues:
self.queues.get(execution.capability.name).complete_execution(
capability_event.execution_id
)
if execution.current_step != len(step_sequence)-1:
# Execution is not on its last step
execution.current_step += 1
else:
# Execution on its last step and received the proper event
execution.state = ExecutionState.Complete.name
self.capability_info.save_execution(execution)
return
# Check if upcoming step is run workflow step
if (
step_sequence[execution.current_step].step_type
== CapabilityStepType.PrepareAndRunWorkflow
):
# Update execution record's state
execution.state = ExecutionState.ExecutingStep.name
# Enqueue execution that is on a run workflow step
# FIXME: Priority needs to be dynamic; perhaps a priority column in execution schema?
self.enqueue_execution(execution, ExecutionPriority.Default.value)
# Update execution record's state
# execution.state = ExecutionState.Ready.name
self.capability_info.save_execution(execution)
else:
# TODO: Logging
print(
f"Mismatched event type {event_type} for execution {capability_event.execution_id}"
)
def enqueue_execution(
self,
execution: CapabilityExecutionIF,
priority: int = ExecutionPriority.Default.value,
):
"""
Move execution record that is ready to execute a workflow into the appropriate capability
queue
:param execution: Capability execution record
:param priority: Priority of the requested execution
"""
# Get correct queue or initialize one
queue = self.queues.get(
execution.capability.name,
CapabilityQueue(
self.capability_info,
self.workflow_service,
execution.capability.max_jobs,
),
)
queue.enqueue(execution, priority)
self.queues[execution.capability.name] = queue
# Remove execution record from pool
if execution in self.execution_pool:
self.execution_pool.remove(execution)
return self.execution_manager.run_capability(request)
def workflow_to_capability_event(self, event: WorkflowEvent) -> CapabilityEvent:
"""
Convert a workflow event into a capability event
Translate a workflow event into a capability event
:param event: Workflow event
:return: Capability event translated from given workflow event
......@@ -187,10 +89,39 @@ class CapabilityService(CapabilityServiceIF):
return CapabilityEvent(event_type, execution.capability_request.id, execution.id)
def listen_for_events(self):
def process_workflow_event(self, event: WorkflowEvent):
"""
Method that will run in a different thread that will be listening for capability events that it can use
to update capability executions
Method that takes in a workflow event and determines an action to take from there
:param event: Workflow event
"""
# Translate workflow event into capability event
capability_event = self.workflow_to_capability_event(event)
# TODO:add capability event sending
# capability_events = Channel(CapabilityEventChannel(), threaded=True)
# capability_events.send(capability_event)
if capability_event:
# Workflow event corresponds to a capability event
self.process_capability_event(capability_event)
def process_capability_event(self, event: CapabilityEvent):
"""
Method that takes in a capability event and determines an action to take from there
:param event: Capability event
"""
self.execution_manager.update_execution(event)
def listen_for_workflow_events(self):
"""
Threaded method that listens for workflow events from the AMQP network that are used to update executions
"""
thread_workflow_events = Channel(WorkflowEventChannel(), threaded=True)
thread_workflow_events.listen(callback=self.update_execution)
thread_workflow_events.listen(callback=self.process_workflow_event)
def listen_for_capability_events(self):
"""
Threaded method that listens for capability events from the AMQP network
"""
thread_capability_events = Channel(CapabilityEventChannel(), threaded=True)
thread_capability_events.listen(callback=self.process_capability_event)
from workspaces.capability.enums import (
CapabilityEventType,
CapabilityStepType,
ExecutionPriority,
ExecutionState,
)
from workspaces.capability.helpers import CapabilitySequence
from workspaces.capability.schema import CapabilityEvent, CapabilityRequest
from workspaces.capability.schema_interfaces import CapabilityExecutionIF
from workspaces.capability.services.capability_queue import CapabilityQueue
from workspaces.capability.services.interfaces import CapabilityInfoIF
from workspaces.workflow.services.interfaces import WorkflowServiceIF
class ExecutionManager:
"""
Class that manages the capability execution system. It's duties include:
- Creating new executions
- Managing existing execution state
- Handling capability events and updating the state of the system accordingly
"""
def __init__(self, capability_info: CapabilityInfoIF, workflow_service: WorkflowServiceIF):
self.execution_pool = []
self.queues = {}
self.capability_info = capability_info
self.workflow_service = workflow_service
def run_capability(self, request: CapabilityRequest) -> CapabilityExecutionIF:
"""
Run a capability with the requested settings
TODO: Executions should be made from capability versions, not requests
:param request: Capability request
:return: Execution of the request
"""
execution = self.capability_info.create_execution(request)
steps = CapabilitySequence.from_str(execution.steps)
if steps[0].step_type == CapabilityStepType.PrepareAndRunWorkflow:
# First step is to run workflow, move to capability queue
# FIXME: Priority needs to be dynamic
self.enqueue_execution(execution, ExecutionPriority.Default.value)
else:
self.execution_pool.append(execution)
return execution
def update_execution(self, capability_event: CapabilityEvent):
"""
Update capability execution given a received capability event
:param capability_event: Received capability event
"""
execution = self.capability_info.lookup_execution(capability_event.execution_id)
step_sequence = CapabilitySequence.from_str(execution.steps)
current_step = step_sequence[execution.current_step]
event_type = CapabilityEventType[capability_event.event_type.name]
# Check if event corresponds to current capability step
if current_step.step_type.value == event_type.value:
# Check if previous step (that just completed) is run workflow step
if current_step.step_type == CapabilityStepType.PrepareAndRunWorkflow:
if self.queues:
# Return capability engine to available state
self.queues.get(execution.capability.name).complete_execution(
capability_event.execution_id
)
if not execution.on_last_step():
execution.current_step += 1
else:
# Execution on its last step and received the proper event
execution.state = ExecutionState.Complete.name
self.capability_info.save_execution(execution)
return
# Check if upcoming step is run workflow step
if current_step.step_type == CapabilityStepType.PrepareAndRunWorkflow:
# Enqueue execution that is on a run workflow step
# FIXME: Priority needs to be dynamic; perhaps a priority column in execution schema?
self.enqueue_execution(execution, ExecutionPriority.Default.value)
# Update execution record's state
# execution.state = ExecutionState.Ready.name
self.capability_info.save_execution(execution)
else:
# Event type does not correspond to current step
# TODO: Logging
print(
f"Mismatched event type {event_type} for execution {capability_event.execution_id}"
)
def enqueue_execution(
self,
execution: CapabilityExecutionIF,
priority: int = ExecutionPriority.Default.value,
):
"""
Move execution record that is ready to execute a workflow into the appropriate capability
queue
:param execution: Capability execution record
:param priority: Priority of the requested execution
"""
# Get correct queue or initialize one
queue = self.queues.get(
execution.capability.name,
CapabilityQueue(
self.capability_info,
self.workflow_service,
execution.capability.max_jobs,
),
)
queue.enqueue(execution, priority)
self.queues[execution.capability.name] = queue
# Remove execution record from pool
if execution in self.execution_pool:
self.execution_pool.remove(execution)
......@@ -101,6 +101,7 @@ class CapabilityEngineIF(ABC):
"""
Executes a prepare and run workflow step of a capability
"""
@abstractmethod
def load_engine(self, execution: CapabilityExecutionIF):
pass
......@@ -121,10 +122,4 @@ class CapabilityServiceIF(ABC):
The capability service: clients access this to request capability runs
"""
@abstractmethod
def run_capability(self, request: CapabilityRequestIF):
"""
Run a capability with the requested settings
:param request: Capability request
"""
raise NotImplementedError
pass
......@@ -11,24 +11,93 @@ pytest_plugins = ["testing.utils.conftest"]
2. All fixtures within this file will now be available for use
3. If you want a non-fixture item from here, you'll have to do a normal Python import
"""
from typing import List
from unittest.mock import patch
import pytest
from alchemy_mock.mocking import UnifiedAlchemyMagicMock
from workspaces.capability.schema import Capability
from workspaces.capability.schema_interfaces import CapabilityRequestIF
from workspaces.capability.services.capability_info import CapabilityInfo
from workspaces.capability.services.capability_service import CapabilityService
from workspaces.workflow.services.workflow_info import WorkflowInfo
from workspaces.workflow.services.workflow_service import WorkflowService
@pytest.fixture(scope="session")
def mock_capability_info():
def mock_capability_info() -> CapabilityInfo:
"""
Pytest fixture providing a mock capability_info object; mocked object can be used
Pytest fixture providing a mock CapabilityInfo object with a test capability; mocked object can be used
like a normal CapabilityInfo instance
TODO: Use UnifiedAlchemyMagicMock's data parameter to add test capabilities
:return: Mock capability_info
:return: Mocked CapabilityInfo
"""
mocked_capability_info = CapabilityInfo(UnifiedAlchemyMagicMock())
test_capability = Capability(name="null", steps="test", max_jobs=2)
mocked_capability_info.save_entity(test_capability)
null_capability = Capability(
name="null", steps="prepare-and-run-workflow null\nawait-workflow", max_jobs=2
)
# test_capability = Capability(name="test", steps="await-qa", max_jobs=777)
mocked_capability_info.save_entity(null_capability)
# mocked_capability_info.save_entity(test_capability)
return mocked_capability_info
@pytest.fixture(scope="session")
def mock_capability_service(
mock_capability_info: CapabilityInfo,
mock_workflow_service: WorkflowService,
) -> CapabilityService:
"""
Pytest fixture providing a mock CapabilityService object with no threaded event listener creation
:return: Mocked CapabilityService
"""
with patch("threading.Thread"):
with patch("threading.Thread.start"):
yield CapabilityService(mock_capability_info, mock_workflow_service)
@pytest.fixture(scope="session")
def mock_workflow_info() -> WorkflowInfo:
"""
Pytest fixture providing a mock WorkflowInfo object; mocked object can be used
like a normal WorkflowInfo instance
:return: Mocked WorkflowInfo
"""
return WorkflowInfo(UnifiedAlchemyMagicMock())
@pytest.fixture(scope="session")
def mock_workflow_service(mock_workflow_info: WorkflowInfo) -> WorkflowService:
"""
Pytest fixture providing a mock WorkflowService object
:return: Mocked WorkflowService
"""
with patch("threading.Thread"):
with patch("threading.Thread.start"):
yield WorkflowService(mock_workflow_info)
@pytest.fixture(scope="session")
def mock_capability_request(
mock_capability_service: CapabilityService,
) -> List[CapabilityRequestIF]:
"""
Pytest fixture adding two mock CapabilityRequest objects that can be accessed from the mock_capability_info
:param mock_capability_service: Mocked CapabilityService
:return: Mocked CapabilityRequest
"""
requests = []
r1 = mock_capability_service.create_request("null", ["-g"])
r1.id = 0
requests.append(r1)
r2 = mock_capability_service.create_request("test")
r2.id = 1
requests.append(r2)
return requests
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment