From 6291b54b1b84e0f7353148be868883e56a0b49b3 Mon Sep 17 00:00:00 2001 From: nhertz <nhertz@nrao.edu> Date: Mon, 12 Oct 2020 16:22:20 -0600 Subject: [PATCH] Capability queue implementation fleshed out; capability engine implemented; CapabilityService.update_execution() now interacts with queues and their engines --- shared/workspaces/src/workspaces/services.py | 169 +++++++++++++++---- 1 file changed, 138 insertions(+), 31 deletions(-) diff --git a/shared/workspaces/src/workspaces/services.py b/shared/workspaces/src/workspaces/services.py index f5df9b44a..062881831 100644 --- a/shared/workspaces/src/workspaces/services.py +++ b/shared/workspaces/src/workspaces/services.py @@ -4,10 +4,11 @@ Services defined by the Workspaces system, to be used by our APIs and client pro import shutil import stat import subprocess +from collections import namedtuple from pathlib import Path from queue import PriorityQueue, Queue, Empty from tempfile import mkdtemp -from typing import Dict, List, Union, Optional, Type +from typing import Dict, List, Union, Optional, Type, NamedTuple from sqlalchemy.orm import Session from workflow.event_catcher import EventCatcher @@ -28,6 +29,9 @@ from .helpers import ( ExecutionState, CapabilityEventType, CapabilityStepType, + CapabilityStep, + FutureProduct, + Parameter, ) from .product_interfaces import FutureProductIF from .workflow_interfaces import WorkflowServiceIF, WorkflowInfoIF @@ -42,7 +46,9 @@ from .schema import ( get_session_factory, WorkflowRequest, AbstractFile, - CapabilityEvent, CapabilityVersion, Base, + CapabilityEvent, + CapabilityVersion, + Base, ) from channels.amqp_helpers import ( workflow_events, @@ -61,7 +67,7 @@ class CapabilityService(CapabilityServiceIF): The capability service: clients access this to request capability runs """ - def __init__(self, info: CapabilityInfoIF): + def __init__(self, info: "CapabilityInfo"): self.execution_pool = [] self.queues = {} self.capability_info = info @@ -69,8 +75,8 @@ class CapabilityService(CapabilityServiceIF): def create_request( self, capability_name: str, - parameters: List[ParameterIF] = None, - products: List[FutureProductIF] = None, + parameters: List[Parameter] = None, + products: List[FutureProduct] = None, ) -> CapabilityRequest: return self.capability_info.create_capability_request( capability_name, parameters, products @@ -97,6 +103,7 @@ class CapabilityService(CapabilityServiceIF): Update capability execution given a received event :param execution_id: ID of execution record :param event: Incoming event + TODO: Implement execution cancellation """ execution = self.capability_info.lookup_execution(execution_id) step_sequence = CapabilitySequence.from_str(execution.steps) @@ -105,13 +112,37 @@ class CapabilityService(CapabilityServiceIF): # Check to make sure event type is correct if current_step.step_type.value == event_type.value: - execution.current_step += 1 + # Check if previous step (that just completed) is run workflow step if ( - step_sequence[execution.current_step] + step_sequence[execution.current_step].step_type + == CapabilityStepType.PrepareAndRunWorkflow.name + ): + # Return capability engine to available state + self.queues[execution.capability.id].complete_execution(execution_id) + + if execution.current_step != len(step_sequence): + # Execution is not on its last step + execution.current_step += 1 + else: + # Execution on its last step and received the proper event + execution.state = ExecutionState.Complete.name + self.capability_info.save_entity(execution) + return + + # Check if upcoming step is run workflow step + if ( + step_sequence[execution.current_step].step_type == CapabilityStepType.PrepareAndRunWorkflow ): - # FIXME: Priority needs to be dynamic + # Update execution record's state + execution.state = ExecutionState.ExecutingStep.name + # Enqueue execution that is on a run workflow step + # FIXME: Priority needs to be dynamic; perhaps a priority column in execution schema? self.enqueue_execution(execution, ExecutionPriority.Default.value) + + # Update execution record's state + execution.state = ExecutionState.Ready.name + self.capability_info.save_entity(execution) else: print(f"Mismatched event type {event_type} for execution {execution_id}") @@ -125,7 +156,10 @@ class CapabilityService(CapabilityServiceIF): queue """ # Get correct queue or initialize one - queue = self.queues.get(execution.capability.id, CapabilityQueue(execution.capability.max_jobs)) + queue = self.queues.get( + execution.capability.id, + CapabilityQueue(self.capability_info, execution.capability.max_jobs), + ) queue.enqueue(execution, priority) self.queues[execution.capability.id] = queue @@ -177,7 +211,7 @@ class CapabilityInfo(CapabilityInfoIF): capability=capability, parameters=str(parameters), # a trick here is to ensure that we always have a first version, with the original parameters - versions=[CapabilityVersion(version_number=1, parameters=str(parameters))] + versions=[CapabilityVersion(version_number=1, parameters=str(parameters))], ) self.save_entity(request) return request @@ -196,18 +230,18 @@ class CapabilityInfo(CapabilityInfoIF): version=most_recent_version, current_step=0, # ensure that we have a copy of the step sequence as it was when the execution started - steps=request.capability.steps + steps=request.capability.steps, ) self.save_entity(execution) return execution - def lookup_capability(self, capability_name: str) -> CapabilityIF: + def lookup_capability(self, capability_name: str) -> Capability: return self.session.query(Capability).filter_by(name=capability_name).one() - def lookup_capability_request(self, request_id) -> "CapabilityRequestIF": + def lookup_capability_request(self, request_id) -> CapabilityRequest: return self.session.query(CapabilityRequest).filter_by(id=request_id).one() - def lookup_execution(self, execution_id: int) -> CapabilityExecutionIF: + def lookup_execution(self, execution_id: int) -> CapabilityExecution: return self.session.query(CapabilityExecution).filter_by(id=execution_id).one() def save_entity(self, entity: Base): @@ -220,16 +254,40 @@ class CapabilityEngine(CapabilityEngineIF): Executes a prepare and run workflow step of a capability """ - def __init__(self, workflow_service: "WorkflowService", execution: CapabilityExecution = None): + def __init__( + self, + capability_info: CapabilityInfo, + workflow_service: "WorkflowService", + execution_id: Optional[int] = None, + ): + self.capability_info = capability_info self.workflow_service = workflow_service - self.execution = execution + self.execution_id = execution_id + if self.execution_id: + self.execution = self.capability_info.lookup_execution(execution_id) def execute(self): """ Communicate with workflow service and send a request to run a workflow with given settings - FIXME: Implement """ - # workflow_service.execute(workflow_name, arguments, files) + step_sequence = CapabilitySequence.from_str(self.execution.steps) + # Grab value of current step (workflow name) + cur_step = step_sequence[int(self.execution.current_step)] + workflow_name = cur_step.step_value + workflow_args = cur_step.step_args + # FIXME: Add support for files + files = [] + + self.workflow_service.execute(workflow_name, workflow_args, files) + + def is_complete(self): + state = self.capability_info.lookup_execution(self.execution.id).state + + if state == ExecutionState.Complete.name: + # Execution completed execution + return True + else: + return False class CapabilityQueue(CapabilityQueueIF): @@ -238,39 +296,83 @@ class CapabilityQueue(CapabilityQueueIF): the number of concurrent executions or pause execution altogether """ - def __init__(self, max_concurrency: int): + EngineList = namedtuple("EngineList", ["available", "in_use"]) + + def __init__(self, capability_info: CapabilityInfo, max_concurrency: int): self.queue = PriorityQueue() - self.workflow_service = WorkflowService() self.max_concurrency = max_concurrency - self.engines = Queue() - for i in range(self.max_concurrency): - self.engines.put(CapabilityEngine(workflow_service=self.workflow_service)) + self.engine_list = self.init_engines( + max_concurrency, capability_info, WorkflowService() + ) self.paused = False + @classmethod + def init_engines( + cls, + num_engines: int, + capability_info: CapabilityInfo, + workflow_service: "WorkflowService", + ) -> cls.EngineList: + """ + Initialize available engine queue and list of in-use engines + :return: EngineList + """ + engine_list = cls.EngineList(Queue(), []) + for _ in range(num_engines): + engine_list.available.put( + CapabilityEngine(capability_info, workflow_service) + ) + + return engine_list + def process_execution(self): """ Process execution at the front of the queue; I imagine this would be called repeatedly with a small delay """ if self.queue.empty() is False: - execution = self.queue.get() - if engine := self.get_available_engine(): + execution_id = self.queue.get() + engine = self.get_available_engine() + if engine: # Place execution in engine - engine.execution = execution + engine.execution_id = execution_id # Start engine engine.execute() - # TODO: Do I need to keep track of executing engines? + # Move engine to in-use list + self.engine_list.in_use.append(engine) else: # FIXME: Logging print("No available engines. Try again later.") + def find_engine(self, execution_id: int) -> CapabilityEngine: + """ + Find in-use engine executing execution with given ID + :param execution_id: ID of requested execution + :return: Corresponding engine + """ + for engine in self.engine_list.in_use: + if engine.execution_id == execution_id: + # Found correct engine + return engine + + def complete_execution(self, execution_id: int): + """ + Remove execution from its engine and return engine to available queue + :param execution_id: ID of requested execution + """ + engine = self.find_engine(execution_id) + self.engine_list.in_use.remove(engine) + engine.execution_id = None + engine.execution = None + self.engine_list.available.put(engine) + def get_available_engine(self) -> Optional[CapabilityEngine]: """ Get an engine if one is available :return: Engine if one is available, else False """ try: - return self.engines.get_nowait() + return self.engine_list.available.get_nowait() except Empty: return None @@ -281,9 +383,11 @@ class CapabilityQueue(CapabilityQueueIF): return self.queue.get() def pause(self): + # TODO: Implement self.paused = True def unpause(self): + # TODO: Implement self.paused = False def __len__(self): @@ -291,6 +395,10 @@ class CapabilityQueue(CapabilityQueueIF): class WorkflowService(WorkflowServiceIF): + """ + Executes workflows; should be a freestanding service. + """ + def execute(self, workflow_name: str, argument: Dict, files: List[Path]): """ Execute a workflow per the supplied parameters. @@ -452,9 +560,8 @@ class WorkflowInfo(WorkflowInfoIF): return self.session.query(Workflow).all() def create_workflow_request( - self, - workflow_name: str, - argument: Dict) -> WorkflowRequest: + self, workflow_name: str, argument: Dict + ) -> WorkflowRequest: """ Create new workflow request and save to database :param workflow_name: name of workflow to run -- GitLab