diff --git a/shared/workspaces/src/workspaces/services.py b/shared/workspaces/src/workspaces/services.py index cb352beb0b790166b9d2a553abb573cc5026a475..f5df9b44ac004f30eed62f351915fd95c5300113 100644 --- a/shared/workspaces/src/workspaces/services.py +++ b/shared/workspaces/src/workspaces/services.py @@ -5,7 +5,7 @@ import shutil import stat import subprocess from pathlib import Path -from queue import PriorityQueue +from queue import PriorityQueue, Queue, Empty from tempfile import mkdtemp from typing import Dict, List, Union, Optional, Type @@ -134,22 +134,6 @@ class CapabilityService(CapabilityServiceIF): self.execution_pool.remove(execution) -class CapabilityEngine(CapabilityEngineIF): - """ - Executes a prepare and run workflow step of a capability - """ - - def __init__(self, execution: CapabilityExecution): - self.execution = execution - - def execute(self): - """ - Communicate with workflow service and send a request to run a workflow with given settings - FIXME: Implement - """ - # workflow_service.execute(workflow_name, arguments, files) - - class CapabilityInfo(CapabilityInfoIF): """ Interface for data access object that can look up and record information about capabilities @@ -231,12 +215,65 @@ class CapabilityInfo(CapabilityInfoIF): self.session.flush() +class CapabilityEngine(CapabilityEngineIF): + """ + Executes a prepare and run workflow step of a capability + """ + + def __init__(self, workflow_service: "WorkflowService", execution: CapabilityExecution = None): + self.workflow_service = workflow_service + self.execution = execution + + def execute(self): + """ + Communicate with workflow service and send a request to run a workflow with given settings + FIXME: Implement + """ + # workflow_service.execute(workflow_name, arguments, files) + + class CapabilityQueue(CapabilityQueueIF): + """ + Organizes capability executions in a priority order and makes it possible to control + the number of concurrent executions or pause execution altogether + """ + def __init__(self, max_concurrency: int): self.queue = PriorityQueue() + self.workflow_service = WorkflowService() self.max_concurrency = max_concurrency + self.engines = Queue() + for i in range(self.max_concurrency): + self.engines.put(CapabilityEngine(workflow_service=self.workflow_service)) self.paused = False + def process_execution(self): + """ + Process execution at the front of the queue; I imagine this would be called repeatedly + with a small delay + """ + if self.queue.empty() is False: + execution = self.queue.get() + if engine := self.get_available_engine(): + # Place execution in engine + engine.execution = execution + # Start engine + engine.execute() + # TODO: Do I need to keep track of executing engines? + else: + # FIXME: Logging + print("No available engines. Try again later.") + + def get_available_engine(self) -> Optional[CapabilityEngine]: + """ + Get an engine if one is available + :return: Engine if one is available, else False + """ + try: + return self.engines.get_nowait() + except Empty: + return None + def enqueue(self, request: CapabilityRequest, priority: int): self.queue.put((priority, request))