Skip to content
Snippets Groups Projects
Commit ddd952f2 authored by Daniel Lyons's avatar Daniel Lyons
Browse files

Add execute to step, and step class types

parent e210c624
No related branches found
No related tags found
No related merge requests found
......@@ -242,7 +242,9 @@ class CapabilityStepIF(ABC):
A step in a capability sequence
"""
pass
@abstractmethod
def execute(self):
pass
class FutureCapabilityResultIF(ProductIF):
......
from __future__ import annotations
import abc
from enum import Enum, auto
from typing import Optional, List, Iterator
......@@ -12,12 +13,39 @@ from .capability_interfaces import (
from .product_interfaces import FutureProductIF
class CapabilityStep(CapabilityStepIF):
class CapabilityStepType(Enum):
"""
Enum that specifies the types of CapabilitySteps that are possible
"""
PrepareAndRunWorkflow = 0
AwaitQA = 1
AwaitWorkflow = 2
AwaitProduct = 3
AwaitParameter = 4
AwaitLargeAllocApproval = 5
@classmethod
def from_string(cls, string: str) -> CapabilityStepType:
strings = {
"prepare-and-run-workflow": cls.PrepareAndRunWorkflow,
"await-qa": cls.AwaitQA,
"await-workflow": cls.AwaitWorkflow,
"await-product": cls.AwaitProduct,
"await-parameter": cls.AwaitParameter,
"await-large-alloc-approval": cls.AwaitLargeAllocApproval,
}
return strings[string]
class CapabilityStep(CapabilityStepIF, abc.ABC):
"""
Class that represents a step in a capability sequence. A step is of a certain type and
certain step types can have a value
"""
TYPES = {}
def __init__(
self,
step_type: CapabilityStepType,
......@@ -39,7 +67,7 @@ class CapabilityStep(CapabilityStepIF):
step_type = CapabilityStepType.from_string(step_list[0])
step_value = step_list[1] if step_list[1] else None
step_args = step_list[2] if step_list[2] else None
return cls(step_type, step_value, step_args)
return cls.TYPES[step_type](step_type, step_value, step_args)
def __str__(self):
string = f"{self.step_type.name}"
......@@ -105,30 +133,44 @@ class FutureProduct(FutureProductIF):
pass
class CapabilityStepType(Enum):
"""
Enum that specifies the types of CapabilitySteps that are possible
"""
class PrepareAndRunWorkflow(CapabilityStep):
def execute(self):
pass
PrepareAndRunWorkflow = 0
AwaitQA = 1
AwaitWorkflow = 2
AwaitProduct = 3
AwaitParameter = 4
AwaitLargeAllocApproval = 5
@classmethod
def from_string(cls, string: str) -> CapabilityStepType:
print(string)
strings = {
"prepare-and-run-workflow": cls.PrepareAndRunWorkflow,
"await-qa": cls.AwaitQA,
"await-workflow": cls.AwaitWorkflow,
"await-product": cls.AwaitProduct,
"await-parameter": cls.AwaitParameter,
"await-large-alloc-approval": cls.AwaitLargeAllocApproval,
}
return strings[string]
class AwaitQa(CapabilityStep):
def execute(self):
pass
class AwaitWorkflow(CapabilityStep):
def execute(self):
pass
class AwaitProduct(CapabilityStep):
def execute(self):
pass
class AwaitParameter(CapabilityStep):
def execute(self):
pass
class AwaitLargeAllocationApproval(CapabilityStep):
def execute(self):
pass
CapabilityStep.TYPES = {
CapabilityStepType.PrepareAndRunWorkflow: PrepareAndRunWorkflow,
CapabilityStepType.AwaitQA: AwaitQa,
CapabilityStepType.AwaitWorkflow: AwaitWorkflow,
CapabilityStepType.AwaitProduct: AwaitProduct,
CapabilityStepType.AwaitParameter: AwaitParameter,
CapabilityStepType.AwaitLargeAllocApproval: AwaitLargeAllocationApproval,
}
class CapabilityEventType(Enum):
......
......@@ -16,6 +16,11 @@ from .test_workflow_services import FakeWorkflowInfo
class FakeCapabilityInfo(CapabilityInfoIF):
def lookup_execution_by_workflow_request_id(
self, workflow_request_id: int
) -> CapabilityExecutionIF:
raise NotImplementedError
capabilities = [
Capability(
id=1,
......
from workspaces.helpers import *
def test_parse_workflow():
step = CapabilityStep.from_str("prepare-and-run-workflow foo bar")
assert isinstance(step, PrepareAndRunWorkflow)
step = CapabilityStep.from_str("await-qa foo bar")
assert isinstance(step, AwaitQa)
step = CapabilityStep.from_str("await-workflow foo bar")
assert isinstance(step, AwaitWorkflow)
step = CapabilityStep.from_str("await-product foo bar")
assert isinstance(step, AwaitProduct)
step = CapabilityStep.from_str("await-parameter foo bar")
assert isinstance(step, AwaitParameter)
step = CapabilityStep.from_str("await-large-alloc-approval foo bar")
assert isinstance(step, AwaitLargeAllocationApproval)
def test_prepare_and_run():
step = CapabilityStep.from_str("prepare-and-run-workflow null {}")
step.execute()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment