Skip to content
Snippets Groups Projects
Commit ce28ed48 authored by Nathan Hertz's avatar Nathan Hertz Committed by Charlotte Hausman
Browse files

Fixed imports

parent 88ef6ef5
No related branches found
No related tags found
1 merge request!306Await QA
This commit is part of merge request !306. Comments created here will be created in the context of that merge request.
......@@ -2,6 +2,7 @@ import logging
from typing import List
import transaction
from messaging.router import Router
from workspaces.capability.helpers import CapabilitySequence
from workspaces.capability.schema_interfaces import CapabilityExecutionIF
......@@ -12,8 +13,6 @@ from workspaces.capability.services.interfaces import (
from workspaces.system.schema import AbstractFile
from workspaces.workflow.services.interfaces import WorkflowServiceIF
from shared.messaging.messaging.router import Router
logger = logging.getLogger(__name__)
......@@ -26,7 +25,7 @@ class CapabilityEngine(CapabilityEngineIF):
self,
capability_info: CapabilityInfoIF,
workflow_service: WorkflowServiceIF,
message_router: Router
message_router: Router,
):
self.capability_info = capability_info
self.workflow_service = workflow_service
......
......@@ -5,6 +5,8 @@ from collections import namedtuple
from queue import Empty, PriorityQueue, Queue
from typing import NamedTuple, Optional
from messaging.router import Router
from workspaces.capability.enums import ExecutionState
from workspaces.capability.schema_interfaces import CapabilityExecutionIF
from workspaces.capability.services.capability_engine import CapabilityEngine
......@@ -14,8 +16,6 @@ from workspaces.capability.services.interfaces import (
)
from workspaces.workflow.services.interfaces import WorkflowServiceIF
from shared.messaging.messaging.router import Router
logger = logging.getLogger(__name__)
......@@ -32,12 +32,14 @@ class CapabilityQueue(CapabilityQueueIF):
capability_info: CapabilityInfoIF,
workflow_service: WorkflowServiceIF,
max_concurrency: int,
message_router: Router
message_router: Router,
):
self.capability_info = capability_info
self.queue = PriorityQueue()
self.max_concurrency = max_concurrency
self.engine_list = self.init_engines(max_concurrency, capability_info, workflow_service, message_router)
self.engine_list = self.init_engines(
max_concurrency, capability_info, workflow_service, message_router
)
self.queue_manager = threading.Thread(target=self.process_executions, daemon=True)
self.queue_manager.start()
self.paused = False
......@@ -48,7 +50,7 @@ class CapabilityQueue(CapabilityQueueIF):
num_engines: int,
capability_info: CapabilityInfoIF,
workflow_service: WorkflowServiceIF,
message_router: Router
message_router: Router,
) -> EngineList:
"""
Initialize available engine queue and list of in-use engines
......@@ -57,7 +59,9 @@ class CapabilityQueue(CapabilityQueueIF):
"""
engine_list = cls.EngineList(available=Queue(), in_use=[])
for _ in range(num_engines):
engine_list.available.put(CapabilityEngine(capability_info, workflow_service, message_router))
engine_list.available.put(
CapabilityEngine(capability_info, workflow_service, message_router)
)
return engine_list
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment