Skip to content
Snippets Groups Projects
Commit 397d8ade authored by Daniel Lyons's avatar Daniel Lyons
Browse files

Add execute method and flush out implementation on prepare-and-run

Widen create_workflow_request API
Move tests around
Fix some APIs to use interfaces instead of implementations
parent 6b0b3cdf
No related branches found
No related tags found
No related merge requests found
......@@ -90,10 +90,9 @@ class CapabilityInfoIF(ABC):
) -> CapabilityRequestIF:
"""
Create new capability request and save it in the database
:param capability_id: ID of the requested capability
:param capability_name: name of the requested capability
:param parameters: List
:param future_products:
:param versions:
:return: Integer identifier of the request
"""
raise NotImplementedError
......@@ -165,7 +164,13 @@ class CapabilityEngineIF(ABC):
"""
@abstractmethod
def execute(self):
def execute(self, execution: CapabilityExecutionIF):
pass
@abstractmethod
def submit_workflow_request(
self, workflow_name: str, workflow_args: dict, files: List["AbstractFile"]
):
pass
......@@ -243,7 +248,7 @@ class CapabilityStepIF(ABC):
"""
@abstractmethod
def execute(self, request: CapabilityRequestIF):
def execute(self, engine: CapabilityEngineIF, execution: CapabilityExecutionIF):
pass
......
from __future__ import annotations
import abc
import json
from enum import Enum, auto
from typing import Optional, List, Iterator
......@@ -8,7 +9,8 @@ from .capability_interfaces import (
ParameterIF,
CapabilityStepIF,
CapabilitySequenceIF,
CapabilityRequestIF,
CapabilityEngineIF,
CapabilityExecutionIF,
)
from .product_interfaces import FutureProductIF
......@@ -134,32 +136,36 @@ class FutureProduct(FutureProductIF):
class PrepareAndRunWorkflow(CapabilityStep):
def execute(self, request: CapabilityRequestIF):
pass
def execute(self, engine: CapabilityEngineIF, execution: CapabilityExecutionIF):
workflow_name = self.step_value
workflow_args = self.step_args
# FIXME: Add support for files
files = []
engine.submit_workflow_request(workflow_name, json.loads(workflow_args), files)
class AwaitQa(CapabilityStep):
def execute(self, request: CapabilityRequestIF):
def execute(self, engine: CapabilityEngineIF, execution: CapabilityExecutionIF):
pass
class AwaitWorkflow(CapabilityStep):
def execute(self, request: CapabilityRequestIF):
def execute(self, engine: CapabilityEngineIF, execution: CapabilityExecutionIF):
pass
class AwaitProduct(CapabilityStep):
def execute(self, request: CapabilityRequestIF):
def execute(self, engine: CapabilityEngineIF, execution: CapabilityExecutionIF):
pass
class AwaitParameter(CapabilityStep):
def execute(self, request: CapabilityRequestIF):
def execute(self, engine: CapabilityEngineIF, execution: CapabilityExecutionIF):
pass
class AwaitLargeAllocationApproval(CapabilityStep):
def execute(self, request: CapabilityRequestIF):
def execute(self, engine: CapabilityEngineIF, execution: CapabilityExecutionIF):
pass
......
......@@ -8,7 +8,7 @@ from collections import namedtuple
from pathlib import Path
from queue import PriorityQueue, Queue, Empty
from tempfile import mkdtemp
from typing import Dict, List, Optional, NamedTuple
from typing import Dict, List, Optional, NamedTuple, Union
from channels.amqp_helpers import (
workflow_events,
......@@ -28,6 +28,7 @@ from .capability_interfaces import (
CapabilityEngineIF,
CapabilityName,
ParameterIF,
CapabilityExecutionIF,
)
from .helpers import (
CapabilitySequence,
......@@ -53,8 +54,9 @@ from .schema import (
CapabilityVersion,
Base,
WorkflowRequestFile,
AbstractFile,
)
from .workflow_interfaces import WorkflowServiceIF, WorkflowInfoIF
from .workflow_interfaces import WorkflowServiceIF, WorkflowInfoIF, WorkflowIF
class CapabilityService(CapabilityServiceIF):
......@@ -63,7 +65,7 @@ class CapabilityService(CapabilityServiceIF):
"""
def __init__(
self, capability_info: "CapabilityInfo", workflow_info: "WorkflowInfo"
self, capability_info: "CapabilityInfoIF", workflow_info: "WorkflowInfoIF"
):
self.execution_pool = []
self.queues = {}
......@@ -239,7 +241,7 @@ class CapabilityInfo(CapabilityInfoIF):
) -> CapabilityRequest:
"""
Create new capability request and save it in the database
:param capability_id: ID of the requested capability
:param capability_name: name of the requested capability
:param parameters: List
:param future_products:
:param versions:
......@@ -318,40 +320,29 @@ class CapabilityEngine(CapabilityEngineIF):
def __init__(
self,
capability_info: CapabilityInfo,
workflow_service: "WorkflowService",
execution_id: Optional[int] = None,
capability_info: CapabilityInfoIF,
workflow_service: "WorkflowServiceIF",
):
self.capability_info = capability_info
self.workflow_service = workflow_service
self.execution_id = execution_id
if self.execution_id:
self.execution = self.capability_info.lookup_execution(execution_id)
def execute(self):
def execute(self, execution: CapabilityExecutionIF):
"""
Communicate with workflow service and send a request to run a workflow with given settings
"""
step_sequence = CapabilitySequence.from_str(self.execution.steps)
step_sequence = CapabilitySequence.from_str(execution.steps)
# Grab value of current step (workflow name)
cur_step = step_sequence[int(self.execution.current_step)]
workflow_name = cur_step.step_value
workflow_args = cur_step.step_args
# FIXME: Add support for files
files = []
cur_step = step_sequence[int(execution.current_step)]
cur_step.execute(self, execution)
workflow_request = self.workflow_service.info.create_workflow_request(workflow_name, workflow_args)
def submit_workflow_request(
self, workflow_name: str, workflow_args: dict, files: List[AbstractFile]
):
workflow_request = self.workflow_service.info.create_workflow_request(
workflow_name, workflow_args
)
self.workflow_service.execute(workflow_request, files)
def is_complete(self):
state = self.capability_info.lookup_execution(self.execution.id).state
if state == ExecutionState.Complete.name:
# Execution completed execution
return True
else:
return False
class CapabilityQueue(CapabilityQueueIF):
"""
......@@ -367,6 +358,8 @@ class CapabilityQueue(CapabilityQueueIF):
workflow_info: "WorkflowInfo",
max_concurrency: int,
):
self.capability_info = capability_info
self.workflow_info = workflow_info
self.queue = PriorityQueue()
self.max_concurrency = max_concurrency
self.engine_list = self.init_engines(
......@@ -402,10 +395,9 @@ class CapabilityQueue(CapabilityQueueIF):
execution_id = self.queue.get()
engine = self.get_available_engine()
if engine:
# Place execution in engine
engine.execution_id = execution_id
execution = self.capability_info.lookup_execution(execution_id)
# Start engine
engine.execute()
engine.execute(execution)
# Move engine to in-use list
self.engine_list.in_use.append(engine)
else:
......@@ -471,7 +463,7 @@ class WorkflowService(WorkflowServiceIF):
# WorkflowEvents that we receive from wf_monitor as the workflow
# execution evolves. So we have to set up listening at some point
# in this class
def __init__(self, info: "WorkflowInfo"):
def __init__(self, info: "WorkflowInfoIF"):
self.info = info
# Start listening for events from the wf_monitor stream
......@@ -480,7 +472,7 @@ class WorkflowService(WorkflowServiceIF):
)
self.listener.start()
def execute(self, request: WorkflowRequest, files: List[Path] = None):
def execute(self, request: WorkflowRequest, files: List[AbstractFile] = None):
"""
Execute a workflow per the supplied parameters.
"""
......@@ -516,7 +508,7 @@ class WorkflowService(WorkflowServiceIF):
"""
# 1. create a temporary directory
# TODO: decide where to put temp directory and make capo property
temp_folder = Path(mkdtemp(dir=str(Path.home())+"/workspaces_tmp"))
temp_folder = Path(mkdtemp(dir=str(Path.home()) + "/workspaces_tmp"))
# 2. spool each of the temp files to it
for file in files:
......@@ -608,7 +600,7 @@ class WorkflowInfo(WorkflowInfoIF):
return self.session.query(WorkflowRequest).all()
def create_workflow_request(
self, workflow: Workflow, argument: Dict
self, workflow: Union[str, WorkflowIF], argument: Dict
) -> WorkflowRequest:
"""
Create new workflow request and save to database
......@@ -616,8 +608,15 @@ class WorkflowInfo(WorkflowInfoIF):
:param argument: workflow arguments
:return: new WorkflowRequest
"""
# if it's an object with a workflow_name, then that's what we use,
# otherwise, the argument must be already be a string
workflow_name = (
workflow.workflow_name if hasattr(workflow, "workflow_name") else workflow
)
request = WorkflowRequest(
workflow_name=workflow.workflow_name,
workflow_name=workflow_name,
argument=argument,
state=WorkflowRequestState.Created.name,
)
......
......@@ -5,8 +5,7 @@ Interfaces used by the Workspaces' workflow system.
import inspect
from abc import ABC, abstractmethod
from pathlib import Path
from typing import Dict, List
from typing import Dict, List, Union
"""
Workflow System
......@@ -44,7 +43,7 @@ class WorkflowServiceIF(ABC):
"""
@abstractmethod
def execute(self, request: WorkflowRequestIF, files: List[Path]):
def execute(self, request: WorkflowRequestIF, files: List["AbstractFile"]):
"""
Execute this workflow against these files.
......@@ -91,7 +90,7 @@ class WorkflowInfoIF(ABC):
@abstractmethod
def create_workflow_request(
self, workflow: WorkflowIF, argument: Dict
self, workflow: Union[str, WorkflowIF], argument: Dict
) -> WorkflowRequestIF:
"""
Create new workflow request and save to database
......
......@@ -7,10 +7,10 @@ from workspaces.capability_interfaces import (
CapabilityRequestIF,
CapabilityName,
)
from workspaces.helpers import ExecutionState
from workspaces.helpers import ExecutionState, CapabilityStep
from workspaces.product_interfaces import FutureProductIF
from workspaces.schema import Capability, CapabilityExecution
from workspaces.services import CapabilityService
from workspaces.services import CapabilityService, CapabilityEngine, WorkflowService
from .test_workflow_services import FakeWorkflowInfo
......@@ -82,3 +82,12 @@ def test_capability_request_creation():
cs = CapabilityService(capability_info, workflow_info)
req = cs.create_request("null")
assert req in capability_info.capabilities[0].requests
def test_prepare_and_run():
cs = CapabilityService(capability_info, workflow_info)
ws = WorkflowService(workflow_info)
engine = CapabilityEngine(capability_info, ws)
step = CapabilityStep.from_str("prepare-and-run-workflow null {}")
execution = CapabilityExecution()
step.execute(engine, execution)
......@@ -14,8 +14,3 @@ def test_parse_workflow():
assert isinstance(step, AwaitParameter)
step = CapabilityStep.from_str("await-large-alloc-approval foo bar")
assert isinstance(step, AwaitLargeAllocationApproval)
def test_prepare_and_run():
step = CapabilityStep.from_str("prepare-and-run-workflow null {}")
step.execute()
from typing import Dict, List
from typing import Dict, List, Union
import json
from workspaces.schema import (
......@@ -33,11 +33,12 @@ class FakeWorkflowInfo(WorkflowInfoIF):
return [self.lookup_workflow_definition("null")]
def create_workflow_request(
self, workflow: WorkflowIF, argument: Dict
self, workflow: Union[str, WorkflowIF], argument: Dict
) -> WorkflowRequestIF:
request = WorkflowRequest(
workflow_name=workflow.workflow_name, argument=argument
workflow_name = (
workflow.workflow_name if hasattr(workflow, "workflow_name") else workflow
)
request = WorkflowRequest(workflow_name=workflow_name, argument=argument)
self.save_request(request)
return request
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment