Skip to content
Snippets Groups Projects
Commit 88ef6ef5 authored by Daniel Lyons's avatar Daniel Lyons Committed by Charlotte Hausman
Browse files

drill a hole through five interfaces to get an existing router

parent f499dff4
No related branches found
No related tags found
1 merge request!306Await QA
......@@ -199,13 +199,7 @@ class AwaitQa(CapabilityStep):
:param execution: Corresponding capability execution
"""
NotificationServiceRESTClient().notify_qa_ready(execution.capability_request)
step_complete_msg = {
"service": "capability",
"routing_key": "capability",
"subject": execution.__json__(),
"type": "step-complete",
}
Router("capability").send_message(**step_complete_msg)
engine.step_complete(execution)
class AwaitWorkflow(CapabilityStep):
......
......@@ -12,6 +12,8 @@ from workspaces.capability.services.interfaces import (
from workspaces.system.schema import AbstractFile
from workspaces.workflow.services.interfaces import WorkflowServiceIF
from shared.messaging.messaging.router import Router
logger = logging.getLogger(__name__)
......@@ -24,10 +26,12 @@ class CapabilityEngine(CapabilityEngineIF):
self,
capability_info: CapabilityInfoIF,
workflow_service: WorkflowServiceIF,
message_router: Router
):
self.capability_info = capability_info
self.workflow_service = workflow_service
self.execution = None
self.message_router = message_router
def load_engine(self, execution: CapabilityExecutionIF):
"""
......@@ -96,3 +100,12 @@ class CapabilityEngine(CapabilityEngineIF):
self.capability_info.save_execution(execution)
transaction.commit()
self.workflow_service.execute(workflow_request)
def step_complete(self, execution: CapabilityExecutionIF):
step_complete_msg = {
"service": "capability",
"routing_key": "capability",
"subject": execution.__json__(),
"type": "step-complete",
}
self.message_router.send_message(**step_complete_msg)
......@@ -14,6 +14,8 @@ from workspaces.capability.services.interfaces import (
)
from workspaces.workflow.services.interfaces import WorkflowServiceIF
from shared.messaging.messaging.router import Router
logger = logging.getLogger(__name__)
......@@ -30,11 +32,12 @@ class CapabilityQueue(CapabilityQueueIF):
capability_info: CapabilityInfoIF,
workflow_service: WorkflowServiceIF,
max_concurrency: int,
message_router: Router
):
self.capability_info = capability_info
self.queue = PriorityQueue()
self.max_concurrency = max_concurrency
self.engine_list = self.init_engines(max_concurrency, capability_info, workflow_service)
self.engine_list = self.init_engines(max_concurrency, capability_info, workflow_service, message_router)
self.queue_manager = threading.Thread(target=self.process_executions, daemon=True)
self.queue_manager.start()
self.paused = False
......@@ -45,6 +48,7 @@ class CapabilityQueue(CapabilityQueueIF):
num_engines: int,
capability_info: CapabilityInfoIF,
workflow_service: WorkflowServiceIF,
message_router: Router
) -> EngineList:
"""
Initialize available engine queue and list of in-use engines
......@@ -53,7 +57,7 @@ class CapabilityQueue(CapabilityQueueIF):
"""
engine_list = cls.EngineList(available=Queue(), in_use=[])
for _ in range(num_engines):
engine_list.available.put(CapabilityEngine(capability_info, workflow_service))
engine_list.available.put(CapabilityEngine(capability_info, workflow_service, message_router))
return engine_list
......
......@@ -231,6 +231,7 @@ class ExecutionManager:
self.capability_info,
self.workflow_service,
execution.capability.max_jobs,
self.message_router
),
)
queue.enqueue(execution, priority)
......
......@@ -142,6 +142,16 @@ class CapabilityEngineIF(ABC):
):
pass
@abstractmethod
def step_complete(self, execution: CapabilityExecutionIF):
"""
Announce that the step is complete. A bit of a kludge, steps should probably
not know they are complete during the time they are being handled by the CapabilityEngine.
:param execution: the execution which needs to be advanced
"""
pass
class CapabilityServiceIF(ABC):
"""
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment