Skip to content
Snippets Groups Projects
Commit fd37b196 authored by Nathan Hertz's avatar Nathan Hertz
Browse files

CapabilityQueue: added threaded queue manager, fixed bugs that were in

the way of things working smoothly
parent daab66f5
No related branches found
No related tags found
No related merge requests found
......@@ -67,8 +67,8 @@ class CapabilityStep(CapabilityStepIF, abc.ABC):
"""
step_list = step_string.split(" ")
step_type = CapabilityStepType.from_string(step_list[0])
step_value = step_list[1] if step_list[1] else None
step_args = step_list[2] if step_list[2] else None
step_value = step_list[1] if len(step_list) == 2 else None
step_args = step_list[2] if len(step_list) == 3 else None
return cls.TYPES[step_type](step_type, step_value, step_args)
def __str__(self):
......@@ -96,7 +96,7 @@ class CapabilitySequence(CapabilitySequenceIF):
:return: CapabilitySequence of given steps
"""
steps = []
for step in sequence_str.split(","):
for step in sequence_str.split("\n"):
steps.append(CapabilityStep.from_str(step))
return cls(steps)
......@@ -137,12 +137,13 @@ class FutureProduct(FutureProductIF):
class PrepareAndRunWorkflow(CapabilityStep):
def execute(self, engine: CapabilityEngineIF, execution: CapabilityExecutionIF):
print("Executing workflow!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
workflow_name = self.step_value
workflow_args = self.step_args
# FIXME: Add support for files
files = []
engine.submit_workflow_request(workflow_name, json.loads(workflow_args), files)
if workflow_args is not None:
workflow_args = json.loads(workflow_args)
engine.submit_workflow_request(workflow_name, workflow_args, files)
class AwaitQa(CapabilityStep):
......
......@@ -526,6 +526,6 @@ def get_session_factory(engine):
:param engine:
:return:
"""
factory = sessionmaker()
factory = sessionmaker(expire_on_commit=False)
factory.configure(bind=engine)
return factory
......@@ -4,6 +4,7 @@ Services defined by the Workspaces system, to be used by our APIs and client pro
import stat
import threading
import subprocess
import time
from collections import namedtuple
from pathlib import Path
from queue import PriorityQueue, Queue, Empty
......@@ -379,6 +380,8 @@ class CapabilityQueue(CapabilityQueueIF):
self.engine_list = self.init_engines(
max_concurrency, capability_info, workflow_service
)
self.queue_manager = threading.Thread(target=self.process_executions, daemon=True)
self.queue_manager.start()
self.paused = False
@classmethod
......@@ -400,23 +403,25 @@ class CapabilityQueue(CapabilityQueueIF):
return engine_list
def process_execution(self):
def process_executions(self):
"""
Process execution at the front of the queue; I imagine this would be called repeatedly
with a small delay
"""
if self.queue.empty() is False:
execution_id = self.queue.get()
engine = self.get_available_engine()
if engine:
execution = self.capability_info.lookup_execution(execution_id)
# Start engine
engine.execute(execution)
# Move engine to in-use list
self.engine_list.in_use.append(engine)
else:
# FIXME: Logging
print("No available engines. Try again later.")
while True:
if self.queue.empty() is False:
execution = self.queue.get()[1]
engine = self.get_available_engine()
if engine:
execution = self.capability_info.lookup_execution(execution.id)
# Start engine
engine.execute(execution)
# Move engine to in-use list
self.engine_list.in_use.append(engine)
else:
# FIXME: Logging
print("No available engines. Try again later.")
time.sleep(0.5)
def find_engine(self, execution_id: int) -> CapabilityEngine:
"""
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment