diff --git a/shared/workspaces/src/workspaces/capability_interfaces.py b/shared/workspaces/src/workspaces/capability_interfaces.py index 960f27d41b6647e6d21db8b8e166262b19ae0101..25b60c1ebdd697fafe15a6b1b06df9e3efd3cff1 100644 --- a/shared/workspaces/src/workspaces/capability_interfaces.py +++ b/shared/workspaces/src/workspaces/capability_interfaces.py @@ -90,10 +90,9 @@ class CapabilityInfoIF(ABC): ) -> CapabilityRequestIF: """ Create new capability request and save it in the database - :param capability_id: ID of the requested capability + :param capability_name: name of the requested capability :param parameters: List :param future_products: - :param versions: :return: Integer identifier of the request """ raise NotImplementedError @@ -165,7 +164,13 @@ class CapabilityEngineIF(ABC): """ @abstractmethod - def execute(self): + def execute(self, execution: CapabilityExecutionIF): + pass + + @abstractmethod + def submit_workflow_request( + self, workflow_name: str, workflow_args: dict, files: List["AbstractFile"] + ): pass @@ -243,7 +248,7 @@ class CapabilityStepIF(ABC): """ @abstractmethod - def execute(self, request: CapabilityRequestIF): + def execute(self, engine: CapabilityEngineIF, execution: CapabilityExecutionIF): pass diff --git a/shared/workspaces/src/workspaces/helpers.py b/shared/workspaces/src/workspaces/helpers.py index 05956c4e626fbc3371c5fb15433198e622ce96d9..9c7a253a2b8dbfd4c6a50eb3522844106353bcdc 100644 --- a/shared/workspaces/src/workspaces/helpers.py +++ b/shared/workspaces/src/workspaces/helpers.py @@ -1,6 +1,7 @@ from __future__ import annotations import abc +import json from enum import Enum, auto from typing import Optional, List, Iterator @@ -8,7 +9,8 @@ from .capability_interfaces import ( ParameterIF, CapabilityStepIF, CapabilitySequenceIF, - CapabilityRequestIF, + CapabilityEngineIF, + CapabilityExecutionIF, ) from .product_interfaces import FutureProductIF @@ -134,32 +136,36 @@ class FutureProduct(FutureProductIF): class PrepareAndRunWorkflow(CapabilityStep): - def execute(self, request: CapabilityRequestIF): - pass + def execute(self, engine: CapabilityEngineIF, execution: CapabilityExecutionIF): + workflow_name = self.step_value + workflow_args = self.step_args + # FIXME: Add support for files + files = [] + engine.submit_workflow_request(workflow_name, json.loads(workflow_args), files) class AwaitQa(CapabilityStep): - def execute(self, request: CapabilityRequestIF): + def execute(self, engine: CapabilityEngineIF, execution: CapabilityExecutionIF): pass class AwaitWorkflow(CapabilityStep): - def execute(self, request: CapabilityRequestIF): + def execute(self, engine: CapabilityEngineIF, execution: CapabilityExecutionIF): pass class AwaitProduct(CapabilityStep): - def execute(self, request: CapabilityRequestIF): + def execute(self, engine: CapabilityEngineIF, execution: CapabilityExecutionIF): pass class AwaitParameter(CapabilityStep): - def execute(self, request: CapabilityRequestIF): + def execute(self, engine: CapabilityEngineIF, execution: CapabilityExecutionIF): pass class AwaitLargeAllocationApproval(CapabilityStep): - def execute(self, request: CapabilityRequestIF): + def execute(self, engine: CapabilityEngineIF, execution: CapabilityExecutionIF): pass diff --git a/shared/workspaces/src/workspaces/services.py b/shared/workspaces/src/workspaces/services.py index 91872435f3bf1a4c260b3b54fae1737bdf45c582..744b0cf05461def79b578923d1713777bfa4306c 100644 --- a/shared/workspaces/src/workspaces/services.py +++ b/shared/workspaces/src/workspaces/services.py @@ -8,7 +8,7 @@ from collections import namedtuple from pathlib import Path from queue import PriorityQueue, Queue, Empty from tempfile import mkdtemp -from typing import Dict, List, Optional, NamedTuple +from typing import Dict, List, Optional, NamedTuple, Union from channels.amqp_helpers import ( workflow_events, @@ -28,6 +28,7 @@ from .capability_interfaces import ( CapabilityEngineIF, CapabilityName, ParameterIF, + CapabilityExecutionIF, ) from .helpers import ( CapabilitySequence, @@ -53,8 +54,9 @@ from .schema import ( CapabilityVersion, Base, WorkflowRequestFile, + AbstractFile, ) -from .workflow_interfaces import WorkflowServiceIF, WorkflowInfoIF +from .workflow_interfaces import WorkflowServiceIF, WorkflowInfoIF, WorkflowIF class CapabilityService(CapabilityServiceIF): @@ -63,7 +65,7 @@ class CapabilityService(CapabilityServiceIF): """ def __init__( - self, capability_info: "CapabilityInfo", workflow_info: "WorkflowInfo" + self, capability_info: "CapabilityInfoIF", workflow_info: "WorkflowInfoIF" ): self.execution_pool = [] self.queues = {} @@ -239,7 +241,7 @@ class CapabilityInfo(CapabilityInfoIF): ) -> CapabilityRequest: """ Create new capability request and save it in the database - :param capability_id: ID of the requested capability + :param capability_name: name of the requested capability :param parameters: List :param future_products: :param versions: @@ -318,40 +320,29 @@ class CapabilityEngine(CapabilityEngineIF): def __init__( self, - capability_info: CapabilityInfo, - workflow_service: "WorkflowService", - execution_id: Optional[int] = None, + capability_info: CapabilityInfoIF, + workflow_service: "WorkflowServiceIF", ): self.capability_info = capability_info self.workflow_service = workflow_service - self.execution_id = execution_id - if self.execution_id: - self.execution = self.capability_info.lookup_execution(execution_id) - def execute(self): + def execute(self, execution: CapabilityExecutionIF): """ Communicate with workflow service and send a request to run a workflow with given settings """ - step_sequence = CapabilitySequence.from_str(self.execution.steps) + step_sequence = CapabilitySequence.from_str(execution.steps) # Grab value of current step (workflow name) - cur_step = step_sequence[int(self.execution.current_step)] - workflow_name = cur_step.step_value - workflow_args = cur_step.step_args - # FIXME: Add support for files - files = [] + cur_step = step_sequence[int(execution.current_step)] + cur_step.execute(self, execution) - workflow_request = self.workflow_service.info.create_workflow_request(workflow_name, workflow_args) + def submit_workflow_request( + self, workflow_name: str, workflow_args: dict, files: List[AbstractFile] + ): + workflow_request = self.workflow_service.info.create_workflow_request( + workflow_name, workflow_args + ) self.workflow_service.execute(workflow_request, files) - def is_complete(self): - state = self.capability_info.lookup_execution(self.execution.id).state - - if state == ExecutionState.Complete.name: - # Execution completed execution - return True - else: - return False - class CapabilityQueue(CapabilityQueueIF): """ @@ -367,6 +358,8 @@ class CapabilityQueue(CapabilityQueueIF): workflow_info: "WorkflowInfo", max_concurrency: int, ): + self.capability_info = capability_info + self.workflow_info = workflow_info self.queue = PriorityQueue() self.max_concurrency = max_concurrency self.engine_list = self.init_engines( @@ -402,10 +395,9 @@ class CapabilityQueue(CapabilityQueueIF): execution_id = self.queue.get() engine = self.get_available_engine() if engine: - # Place execution in engine - engine.execution_id = execution_id + execution = self.capability_info.lookup_execution(execution_id) # Start engine - engine.execute() + engine.execute(execution) # Move engine to in-use list self.engine_list.in_use.append(engine) else: @@ -471,7 +463,7 @@ class WorkflowService(WorkflowServiceIF): # WorkflowEvents that we receive from wf_monitor as the workflow # execution evolves. So we have to set up listening at some point # in this class - def __init__(self, info: "WorkflowInfo"): + def __init__(self, info: "WorkflowInfoIF"): self.info = info # Start listening for events from the wf_monitor stream @@ -480,7 +472,7 @@ class WorkflowService(WorkflowServiceIF): ) self.listener.start() - def execute(self, request: WorkflowRequest, files: List[Path] = None): + def execute(self, request: WorkflowRequest, files: List[AbstractFile] = None): """ Execute a workflow per the supplied parameters. """ @@ -516,7 +508,7 @@ class WorkflowService(WorkflowServiceIF): """ # 1. create a temporary directory # TODO: decide where to put temp directory and make capo property - temp_folder = Path(mkdtemp(dir=str(Path.home())+"/workspaces_tmp")) + temp_folder = Path(mkdtemp(dir=str(Path.home()) + "/workspaces_tmp")) # 2. spool each of the temp files to it for file in files: @@ -608,7 +600,7 @@ class WorkflowInfo(WorkflowInfoIF): return self.session.query(WorkflowRequest).all() def create_workflow_request( - self, workflow: Workflow, argument: Dict + self, workflow: Union[str, WorkflowIF], argument: Dict ) -> WorkflowRequest: """ Create new workflow request and save to database @@ -616,8 +608,15 @@ class WorkflowInfo(WorkflowInfoIF): :param argument: workflow arguments :return: new WorkflowRequest """ + + # if it's an object with a workflow_name, then that's what we use, + # otherwise, the argument must be already be a string + workflow_name = ( + workflow.workflow_name if hasattr(workflow, "workflow_name") else workflow + ) + request = WorkflowRequest( - workflow_name=workflow.workflow_name, + workflow_name=workflow_name, argument=argument, state=WorkflowRequestState.Created.name, ) diff --git a/shared/workspaces/src/workspaces/workflow_interfaces.py b/shared/workspaces/src/workspaces/workflow_interfaces.py index 5f81538fe6a8622e174875299126346ffaa0d476..5ce3af2cd9db8a402a057f2dea2913ada97d4b7c 100644 --- a/shared/workspaces/src/workspaces/workflow_interfaces.py +++ b/shared/workspaces/src/workspaces/workflow_interfaces.py @@ -5,8 +5,7 @@ Interfaces used by the Workspaces' workflow system. import inspect from abc import ABC, abstractmethod from pathlib import Path -from typing import Dict, List - +from typing import Dict, List, Union """ Workflow System @@ -44,7 +43,7 @@ class WorkflowServiceIF(ABC): """ @abstractmethod - def execute(self, request: WorkflowRequestIF, files: List[Path]): + def execute(self, request: WorkflowRequestIF, files: List["AbstractFile"]): """ Execute this workflow against these files. @@ -91,7 +90,7 @@ class WorkflowInfoIF(ABC): @abstractmethod def create_workflow_request( - self, workflow: WorkflowIF, argument: Dict + self, workflow: Union[str, WorkflowIF], argument: Dict ) -> WorkflowRequestIF: """ Create new workflow request and save to database diff --git a/shared/workspaces/test/test_capability_services.py b/shared/workspaces/test/test_capability_services.py index 7108cb1ad68d3d0ff596f3b258afa50330ad811d..2407e2654c9d36d021cfc99ec80fe4abf0dc5ade 100644 --- a/shared/workspaces/test/test_capability_services.py +++ b/shared/workspaces/test/test_capability_services.py @@ -7,10 +7,10 @@ from workspaces.capability_interfaces import ( CapabilityRequestIF, CapabilityName, ) -from workspaces.helpers import ExecutionState +from workspaces.helpers import ExecutionState, CapabilityStep from workspaces.product_interfaces import FutureProductIF from workspaces.schema import Capability, CapabilityExecution -from workspaces.services import CapabilityService +from workspaces.services import CapabilityService, CapabilityEngine, WorkflowService from .test_workflow_services import FakeWorkflowInfo @@ -82,3 +82,12 @@ def test_capability_request_creation(): cs = CapabilityService(capability_info, workflow_info) req = cs.create_request("null") assert req in capability_info.capabilities[0].requests + + +def test_prepare_and_run(): + cs = CapabilityService(capability_info, workflow_info) + ws = WorkflowService(workflow_info) + engine = CapabilityEngine(capability_info, ws) + step = CapabilityStep.from_str("prepare-and-run-workflow null {}") + execution = CapabilityExecution() + step.execute(engine, execution) diff --git a/shared/workspaces/test/test_capability_steps.py b/shared/workspaces/test/test_capability_steps.py index 3790bd0876233896ce8ab968170224d3be4db1fb..1cb8c62e58b7d455b973c4d8a7b89c58d0d76a48 100644 --- a/shared/workspaces/test/test_capability_steps.py +++ b/shared/workspaces/test/test_capability_steps.py @@ -14,8 +14,3 @@ def test_parse_workflow(): assert isinstance(step, AwaitParameter) step = CapabilityStep.from_str("await-large-alloc-approval foo bar") assert isinstance(step, AwaitLargeAllocationApproval) - - -def test_prepare_and_run(): - step = CapabilityStep.from_str("prepare-and-run-workflow null {}") - step.execute() diff --git a/shared/workspaces/test/test_workflow_services.py b/shared/workspaces/test/test_workflow_services.py index a76fab4236acc91783fefd957228c47f289e4643..60969ffa631ce4bc6e5773ed2999bd325ff245fe 100644 --- a/shared/workspaces/test/test_workflow_services.py +++ b/shared/workspaces/test/test_workflow_services.py @@ -1,4 +1,4 @@ -from typing import Dict, List +from typing import Dict, List, Union import json import requests @@ -35,11 +35,12 @@ class FakeWorkflowInfo(WorkflowInfoIF): return [self.lookup_workflow_definition("null")] def create_workflow_request( - self, workflow: WorkflowIF, argument: Dict + self, workflow: Union[str, WorkflowIF], argument: Dict ) -> WorkflowRequestIF: - request = WorkflowRequest( - workflow_name=workflow.workflow_name, argument=argument + workflow_name = ( + workflow.workflow_name if hasattr(workflow, "workflow_name") else workflow ) + request = WorkflowRequest(workflow_name=workflow_name, argument=argument) self.save_request(request) return request