Skip to content
Snippets Groups Projects
Commit 6291b54b authored by Nathan Hertz's avatar Nathan Hertz
Browse files

Capability queue implementation fleshed out; capability engine implemented;...

Capability queue implementation fleshed out; capability engine implemented; CapabilityService.update_execution() now interacts with queues and their engines
parent 6a02b037
No related branches found
No related tags found
No related merge requests found
...@@ -4,10 +4,11 @@ Services defined by the Workspaces system, to be used by our APIs and client pro ...@@ -4,10 +4,11 @@ Services defined by the Workspaces system, to be used by our APIs and client pro
import shutil import shutil
import stat import stat
import subprocess import subprocess
from collections import namedtuple
from pathlib import Path from pathlib import Path
from queue import PriorityQueue, Queue, Empty from queue import PriorityQueue, Queue, Empty
from tempfile import mkdtemp from tempfile import mkdtemp
from typing import Dict, List, Union, Optional, Type from typing import Dict, List, Union, Optional, Type, NamedTuple
from sqlalchemy.orm import Session from sqlalchemy.orm import Session
from workflow.event_catcher import EventCatcher from workflow.event_catcher import EventCatcher
...@@ -28,6 +29,9 @@ from .helpers import ( ...@@ -28,6 +29,9 @@ from .helpers import (
ExecutionState, ExecutionState,
CapabilityEventType, CapabilityEventType,
CapabilityStepType, CapabilityStepType,
CapabilityStep,
FutureProduct,
Parameter,
) )
from .product_interfaces import FutureProductIF from .product_interfaces import FutureProductIF
from .workflow_interfaces import WorkflowServiceIF, WorkflowInfoIF from .workflow_interfaces import WorkflowServiceIF, WorkflowInfoIF
...@@ -42,7 +46,9 @@ from .schema import ( ...@@ -42,7 +46,9 @@ from .schema import (
get_session_factory, get_session_factory,
WorkflowRequest, WorkflowRequest,
AbstractFile, AbstractFile,
CapabilityEvent, CapabilityVersion, Base, CapabilityEvent,
CapabilityVersion,
Base,
) )
from channels.amqp_helpers import ( from channels.amqp_helpers import (
workflow_events, workflow_events,
...@@ -61,7 +67,7 @@ class CapabilityService(CapabilityServiceIF): ...@@ -61,7 +67,7 @@ class CapabilityService(CapabilityServiceIF):
The capability service: clients access this to request capability runs The capability service: clients access this to request capability runs
""" """
def __init__(self, info: CapabilityInfoIF): def __init__(self, info: "CapabilityInfo"):
self.execution_pool = [] self.execution_pool = []
self.queues = {} self.queues = {}
self.capability_info = info self.capability_info = info
...@@ -69,8 +75,8 @@ class CapabilityService(CapabilityServiceIF): ...@@ -69,8 +75,8 @@ class CapabilityService(CapabilityServiceIF):
def create_request( def create_request(
self, self,
capability_name: str, capability_name: str,
parameters: List[ParameterIF] = None, parameters: List[Parameter] = None,
products: List[FutureProductIF] = None, products: List[FutureProduct] = None,
) -> CapabilityRequest: ) -> CapabilityRequest:
return self.capability_info.create_capability_request( return self.capability_info.create_capability_request(
capability_name, parameters, products capability_name, parameters, products
...@@ -97,6 +103,7 @@ class CapabilityService(CapabilityServiceIF): ...@@ -97,6 +103,7 @@ class CapabilityService(CapabilityServiceIF):
Update capability execution given a received event Update capability execution given a received event
:param execution_id: ID of execution record :param execution_id: ID of execution record
:param event: Incoming event :param event: Incoming event
TODO: Implement execution cancellation
""" """
execution = self.capability_info.lookup_execution(execution_id) execution = self.capability_info.lookup_execution(execution_id)
step_sequence = CapabilitySequence.from_str(execution.steps) step_sequence = CapabilitySequence.from_str(execution.steps)
...@@ -105,13 +112,37 @@ class CapabilityService(CapabilityServiceIF): ...@@ -105,13 +112,37 @@ class CapabilityService(CapabilityServiceIF):
# Check to make sure event type is correct # Check to make sure event type is correct
if current_step.step_type.value == event_type.value: if current_step.step_type.value == event_type.value:
execution.current_step += 1 # Check if previous step (that just completed) is run workflow step
if ( if (
step_sequence[execution.current_step] step_sequence[execution.current_step].step_type
== CapabilityStepType.PrepareAndRunWorkflow.name
):
# Return capability engine to available state
self.queues[execution.capability.id].complete_execution(execution_id)
if execution.current_step != len(step_sequence):
# Execution is not on its last step
execution.current_step += 1
else:
# Execution on its last step and received the proper event
execution.state = ExecutionState.Complete.name
self.capability_info.save_entity(execution)
return
# Check if upcoming step is run workflow step
if (
step_sequence[execution.current_step].step_type
== CapabilityStepType.PrepareAndRunWorkflow == CapabilityStepType.PrepareAndRunWorkflow
): ):
# FIXME: Priority needs to be dynamic # Update execution record's state
execution.state = ExecutionState.ExecutingStep.name
# Enqueue execution that is on a run workflow step
# FIXME: Priority needs to be dynamic; perhaps a priority column in execution schema?
self.enqueue_execution(execution, ExecutionPriority.Default.value) self.enqueue_execution(execution, ExecutionPriority.Default.value)
# Update execution record's state
execution.state = ExecutionState.Ready.name
self.capability_info.save_entity(execution)
else: else:
print(f"Mismatched event type {event_type} for execution {execution_id}") print(f"Mismatched event type {event_type} for execution {execution_id}")
...@@ -125,7 +156,10 @@ class CapabilityService(CapabilityServiceIF): ...@@ -125,7 +156,10 @@ class CapabilityService(CapabilityServiceIF):
queue queue
""" """
# Get correct queue or initialize one # Get correct queue or initialize one
queue = self.queues.get(execution.capability.id, CapabilityQueue(execution.capability.max_jobs)) queue = self.queues.get(
execution.capability.id,
CapabilityQueue(self.capability_info, execution.capability.max_jobs),
)
queue.enqueue(execution, priority) queue.enqueue(execution, priority)
self.queues[execution.capability.id] = queue self.queues[execution.capability.id] = queue
...@@ -177,7 +211,7 @@ class CapabilityInfo(CapabilityInfoIF): ...@@ -177,7 +211,7 @@ class CapabilityInfo(CapabilityInfoIF):
capability=capability, capability=capability,
parameters=str(parameters), parameters=str(parameters),
# a trick here is to ensure that we always have a first version, with the original parameters # a trick here is to ensure that we always have a first version, with the original parameters
versions=[CapabilityVersion(version_number=1, parameters=str(parameters))] versions=[CapabilityVersion(version_number=1, parameters=str(parameters))],
) )
self.save_entity(request) self.save_entity(request)
return request return request
...@@ -196,18 +230,18 @@ class CapabilityInfo(CapabilityInfoIF): ...@@ -196,18 +230,18 @@ class CapabilityInfo(CapabilityInfoIF):
version=most_recent_version, version=most_recent_version,
current_step=0, current_step=0,
# ensure that we have a copy of the step sequence as it was when the execution started # ensure that we have a copy of the step sequence as it was when the execution started
steps=request.capability.steps steps=request.capability.steps,
) )
self.save_entity(execution) self.save_entity(execution)
return execution return execution
def lookup_capability(self, capability_name: str) -> CapabilityIF: def lookup_capability(self, capability_name: str) -> Capability:
return self.session.query(Capability).filter_by(name=capability_name).one() return self.session.query(Capability).filter_by(name=capability_name).one()
def lookup_capability_request(self, request_id) -> "CapabilityRequestIF": def lookup_capability_request(self, request_id) -> CapabilityRequest:
return self.session.query(CapabilityRequest).filter_by(id=request_id).one() return self.session.query(CapabilityRequest).filter_by(id=request_id).one()
def lookup_execution(self, execution_id: int) -> CapabilityExecutionIF: def lookup_execution(self, execution_id: int) -> CapabilityExecution:
return self.session.query(CapabilityExecution).filter_by(id=execution_id).one() return self.session.query(CapabilityExecution).filter_by(id=execution_id).one()
def save_entity(self, entity: Base): def save_entity(self, entity: Base):
...@@ -220,16 +254,40 @@ class CapabilityEngine(CapabilityEngineIF): ...@@ -220,16 +254,40 @@ class CapabilityEngine(CapabilityEngineIF):
Executes a prepare and run workflow step of a capability Executes a prepare and run workflow step of a capability
""" """
def __init__(self, workflow_service: "WorkflowService", execution: CapabilityExecution = None): def __init__(
self,
capability_info: CapabilityInfo,
workflow_service: "WorkflowService",
execution_id: Optional[int] = None,
):
self.capability_info = capability_info
self.workflow_service = workflow_service self.workflow_service = workflow_service
self.execution = execution self.execution_id = execution_id
if self.execution_id:
self.execution = self.capability_info.lookup_execution(execution_id)
def execute(self): def execute(self):
""" """
Communicate with workflow service and send a request to run a workflow with given settings Communicate with workflow service and send a request to run a workflow with given settings
FIXME: Implement
""" """
# workflow_service.execute(workflow_name, arguments, files) step_sequence = CapabilitySequence.from_str(self.execution.steps)
# Grab value of current step (workflow name)
cur_step = step_sequence[int(self.execution.current_step)]
workflow_name = cur_step.step_value
workflow_args = cur_step.step_args
# FIXME: Add support for files
files = []
self.workflow_service.execute(workflow_name, workflow_args, files)
def is_complete(self):
state = self.capability_info.lookup_execution(self.execution.id).state
if state == ExecutionState.Complete.name:
# Execution completed execution
return True
else:
return False
class CapabilityQueue(CapabilityQueueIF): class CapabilityQueue(CapabilityQueueIF):
...@@ -238,39 +296,83 @@ class CapabilityQueue(CapabilityQueueIF): ...@@ -238,39 +296,83 @@ class CapabilityQueue(CapabilityQueueIF):
the number of concurrent executions or pause execution altogether the number of concurrent executions or pause execution altogether
""" """
def __init__(self, max_concurrency: int): EngineList = namedtuple("EngineList", ["available", "in_use"])
def __init__(self, capability_info: CapabilityInfo, max_concurrency: int):
self.queue = PriorityQueue() self.queue = PriorityQueue()
self.workflow_service = WorkflowService()
self.max_concurrency = max_concurrency self.max_concurrency = max_concurrency
self.engines = Queue() self.engine_list = self.init_engines(
for i in range(self.max_concurrency): max_concurrency, capability_info, WorkflowService()
self.engines.put(CapabilityEngine(workflow_service=self.workflow_service)) )
self.paused = False self.paused = False
@classmethod
def init_engines(
cls,
num_engines: int,
capability_info: CapabilityInfo,
workflow_service: "WorkflowService",
) -> cls.EngineList:
"""
Initialize available engine queue and list of in-use engines
:return: EngineList
"""
engine_list = cls.EngineList(Queue(), [])
for _ in range(num_engines):
engine_list.available.put(
CapabilityEngine(capability_info, workflow_service)
)
return engine_list
def process_execution(self): def process_execution(self):
""" """
Process execution at the front of the queue; I imagine this would be called repeatedly Process execution at the front of the queue; I imagine this would be called repeatedly
with a small delay with a small delay
""" """
if self.queue.empty() is False: if self.queue.empty() is False:
execution = self.queue.get() execution_id = self.queue.get()
if engine := self.get_available_engine(): engine = self.get_available_engine()
if engine:
# Place execution in engine # Place execution in engine
engine.execution = execution engine.execution_id = execution_id
# Start engine # Start engine
engine.execute() engine.execute()
# TODO: Do I need to keep track of executing engines? # Move engine to in-use list
self.engine_list.in_use.append(engine)
else: else:
# FIXME: Logging # FIXME: Logging
print("No available engines. Try again later.") print("No available engines. Try again later.")
def find_engine(self, execution_id: int) -> CapabilityEngine:
"""
Find in-use engine executing execution with given ID
:param execution_id: ID of requested execution
:return: Corresponding engine
"""
for engine in self.engine_list.in_use:
if engine.execution_id == execution_id:
# Found correct engine
return engine
def complete_execution(self, execution_id: int):
"""
Remove execution from its engine and return engine to available queue
:param execution_id: ID of requested execution
"""
engine = self.find_engine(execution_id)
self.engine_list.in_use.remove(engine)
engine.execution_id = None
engine.execution = None
self.engine_list.available.put(engine)
def get_available_engine(self) -> Optional[CapabilityEngine]: def get_available_engine(self) -> Optional[CapabilityEngine]:
""" """
Get an engine if one is available Get an engine if one is available
:return: Engine if one is available, else False :return: Engine if one is available, else False
""" """
try: try:
return self.engines.get_nowait() return self.engine_list.available.get_nowait()
except Empty: except Empty:
return None return None
...@@ -281,9 +383,11 @@ class CapabilityQueue(CapabilityQueueIF): ...@@ -281,9 +383,11 @@ class CapabilityQueue(CapabilityQueueIF):
return self.queue.get() return self.queue.get()
def pause(self): def pause(self):
# TODO: Implement
self.paused = True self.paused = True
def unpause(self): def unpause(self):
# TODO: Implement
self.paused = False self.paused = False
def __len__(self): def __len__(self):
...@@ -291,6 +395,10 @@ class CapabilityQueue(CapabilityQueueIF): ...@@ -291,6 +395,10 @@ class CapabilityQueue(CapabilityQueueIF):
class WorkflowService(WorkflowServiceIF): class WorkflowService(WorkflowServiceIF):
"""
Executes workflows; should be a freestanding service.
"""
def execute(self, workflow_name: str, argument: Dict, files: List[Path]): def execute(self, workflow_name: str, argument: Dict, files: List[Path]):
""" """
Execute a workflow per the supplied parameters. Execute a workflow per the supplied parameters.
...@@ -452,9 +560,8 @@ class WorkflowInfo(WorkflowInfoIF): ...@@ -452,9 +560,8 @@ class WorkflowInfo(WorkflowInfoIF):
return self.session.query(Workflow).all() return self.session.query(Workflow).all()
def create_workflow_request( def create_workflow_request(
self, self, workflow_name: str, argument: Dict
workflow_name: str, ) -> WorkflowRequest:
argument: Dict) -> WorkflowRequest:
""" """
Create new workflow request and save to database Create new workflow request and save to database
:param workflow_name: name of workflow to run :param workflow_name: name of workflow to run
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment