Skip to content
Snippets Groups Projects
Commit dc5a49ea authored by Nathan Hertz's avatar Nathan Hertz
Browse files

Work on CapabilityQueue and CapabilityEngine to enable automatic

execution processing
parent ff0c0f20
No related branches found
No related tags found
No related merge requests found
...@@ -5,7 +5,7 @@ import shutil ...@@ -5,7 +5,7 @@ import shutil
import stat import stat
import subprocess import subprocess
from pathlib import Path from pathlib import Path
from queue import PriorityQueue from queue import PriorityQueue, Queue, Empty
from tempfile import mkdtemp from tempfile import mkdtemp
from typing import Dict, List, Union, Optional, Type from typing import Dict, List, Union, Optional, Type
...@@ -134,22 +134,6 @@ class CapabilityService(CapabilityServiceIF): ...@@ -134,22 +134,6 @@ class CapabilityService(CapabilityServiceIF):
self.execution_pool.remove(execution) self.execution_pool.remove(execution)
class CapabilityEngine(CapabilityEngineIF):
"""
Executes a prepare and run workflow step of a capability
"""
def __init__(self, execution: CapabilityExecution):
self.execution = execution
def execute(self):
"""
Communicate with workflow service and send a request to run a workflow with given settings
FIXME: Implement
"""
# workflow_service.execute(workflow_name, arguments, files)
class CapabilityInfo(CapabilityInfoIF): class CapabilityInfo(CapabilityInfoIF):
""" """
Interface for data access object that can look up and record information about capabilities Interface for data access object that can look up and record information about capabilities
...@@ -231,12 +215,65 @@ class CapabilityInfo(CapabilityInfoIF): ...@@ -231,12 +215,65 @@ class CapabilityInfo(CapabilityInfoIF):
self.session.flush() self.session.flush()
class CapabilityEngine(CapabilityEngineIF):
"""
Executes a prepare and run workflow step of a capability
"""
def __init__(self, workflow_service: "WorkflowService", execution: CapabilityExecution = None):
self.workflow_service = workflow_service
self.execution = execution
def execute(self):
"""
Communicate with workflow service and send a request to run a workflow with given settings
FIXME: Implement
"""
# workflow_service.execute(workflow_name, arguments, files)
class CapabilityQueue(CapabilityQueueIF): class CapabilityQueue(CapabilityQueueIF):
"""
Organizes capability executions in a priority order and makes it possible to control
the number of concurrent executions or pause execution altogether
"""
def __init__(self, max_concurrency: int): def __init__(self, max_concurrency: int):
self.queue = PriorityQueue() self.queue = PriorityQueue()
self.workflow_service = WorkflowService()
self.max_concurrency = max_concurrency self.max_concurrency = max_concurrency
self.engines = Queue()
for i in range(self.max_concurrency):
self.engines.put(CapabilityEngine(workflow_service=self.workflow_service))
self.paused = False self.paused = False
def process_execution(self):
"""
Process execution at the front of the queue; I imagine this would be called repeatedly
with a small delay
"""
if self.queue.empty() is False:
execution = self.queue.get()
if engine := self.get_available_engine():
# Place execution in engine
engine.execution = execution
# Start engine
engine.execute()
# TODO: Do I need to keep track of executing engines?
else:
# FIXME: Logging
print("No available engines. Try again later.")
def get_available_engine(self) -> Optional[CapabilityEngine]:
"""
Get an engine if one is available
:return: Engine if one is available, else False
"""
try:
return self.engines.get_nowait()
except Empty:
return None
def enqueue(self, request: CapabilityRequest, priority: int): def enqueue(self, request: CapabilityRequest, priority: int):
self.queue.put((priority, request)) self.queue.put((priority, request))
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment