diff --git a/shared/workspaces/workspaces/capability/services/capability_engine.py b/shared/workspaces/workspaces/capability/services/capability_engine.py index 8e98ce4e8b498c12e9cfe07976ebcb8670066b69..026b701b3fffea7447737c834bc3109ff8722431 100644 --- a/shared/workspaces/workspaces/capability/services/capability_engine.py +++ b/shared/workspaces/workspaces/capability/services/capability_engine.py @@ -2,6 +2,7 @@ import logging from typing import List import transaction +from messaging.router import Router from workspaces.capability.helpers import CapabilitySequence from workspaces.capability.schema_interfaces import CapabilityExecutionIF @@ -12,8 +13,6 @@ from workspaces.capability.services.interfaces import ( from workspaces.system.schema import AbstractFile from workspaces.workflow.services.interfaces import WorkflowServiceIF -from shared.messaging.messaging.router import Router - logger = logging.getLogger(__name__) @@ -26,7 +25,7 @@ class CapabilityEngine(CapabilityEngineIF): self, capability_info: CapabilityInfoIF, workflow_service: WorkflowServiceIF, - message_router: Router + message_router: Router, ): self.capability_info = capability_info self.workflow_service = workflow_service diff --git a/shared/workspaces/workspaces/capability/services/capability_queue.py b/shared/workspaces/workspaces/capability/services/capability_queue.py index de1e28d939fa8c6e82252abd2d28c8943c3516a1..c7dcc686d49198657281b3a9883141782ecf39a3 100644 --- a/shared/workspaces/workspaces/capability/services/capability_queue.py +++ b/shared/workspaces/workspaces/capability/services/capability_queue.py @@ -5,6 +5,8 @@ from collections import namedtuple from queue import Empty, PriorityQueue, Queue from typing import NamedTuple, Optional +from messaging.router import Router + from workspaces.capability.enums import ExecutionState from workspaces.capability.schema_interfaces import CapabilityExecutionIF from workspaces.capability.services.capability_engine import CapabilityEngine @@ -14,8 +16,6 @@ from workspaces.capability.services.interfaces import ( ) from workspaces.workflow.services.interfaces import WorkflowServiceIF -from shared.messaging.messaging.router import Router - logger = logging.getLogger(__name__) @@ -32,12 +32,14 @@ class CapabilityQueue(CapabilityQueueIF): capability_info: CapabilityInfoIF, workflow_service: WorkflowServiceIF, max_concurrency: int, - message_router: Router + message_router: Router, ): self.capability_info = capability_info self.queue = PriorityQueue() self.max_concurrency = max_concurrency - self.engine_list = self.init_engines(max_concurrency, capability_info, workflow_service, message_router) + self.engine_list = self.init_engines( + max_concurrency, capability_info, workflow_service, message_router + ) self.queue_manager = threading.Thread(target=self.process_executions, daemon=True) self.queue_manager.start() self.paused = False @@ -48,7 +50,7 @@ class CapabilityQueue(CapabilityQueueIF): num_engines: int, capability_info: CapabilityInfoIF, workflow_service: WorkflowServiceIF, - message_router: Router + message_router: Router, ) -> EngineList: """ Initialize available engine queue and list of in-use engines @@ -57,7 +59,9 @@ class CapabilityQueue(CapabilityQueueIF): """ engine_list = cls.EngineList(available=Queue(), in_use=[]) for _ in range(num_engines): - engine_list.available.put(CapabilityEngine(capability_info, workflow_service, message_router)) + engine_list.available.put( + CapabilityEngine(capability_info, workflow_service, message_router) + ) return engine_list