Skip to content
Snippets Groups Projects
Commit 2430da91 authored by Nathan Hertz's avatar Nathan Hertz
Browse files

Continued updating interfaces.py to bring things closer and closer to

the architecture we want; updated Capability and CapabilityRequest in schema.py
parent b998b426
No related branches found
No related tags found
No related merge requests found
......@@ -28,52 +28,17 @@ class CapabilityIF(ABC):
A capability, which is a particular workflow setup, intended to accept
a certain kind of product and some parameters and produce another product
"""
def __init__(self, name: CapabilityName, max_jobs: int, request: "CapabilityRequestIF"):
def __init__(self, name: CapabilityName, max_jobs: Optional[int] = 2):
self.name = name
self.max_jobs = max_jobs
self.request = request
def create_request(self, locators: List[ProductLocator]) -> "CapabilityRequestIF":
def create_request(self):
"""
Create a capability request for this capability
:param locators: product locators for the new request
:return: a capability request
"""
raise NotImplementedError(f'{self.__class__.__name__}.{inspect.stack()[0][3]}')
class CapabilityRequestIF(ABC):
"""
A capability request, which couples a capability to a product, representing
the expectation of a new product given a set of parameters
"""
def __init__(
self,
request_id: Optional[int],
capability: CapabilityIF,
info: "CapabilityInfoIF",
queue: "CapabilityQueueIF"
):
self.request_id = request_id
self.capability = capability
self.info = info
self.queue = queue
@property
def last_parameter(self) -> "ParameterIF":
return self.info.proj_settings.parameters[-1]
def create_exec_record(self, info: "CapabilityInfoIF", queue: "CapabilityQueueIF") -> "CapabilityExecutionIF":
return CapabilityExecutionIF(
# FIXME: Need a way to get from CapabilityRequest to WorkflowRequest
request=self,
proj_settings=info.proj_settings,
sequence=info.steps,
current_step=info.steps[0]
)
class CapabilityExecutionIF(ABC):
"""
An object representing an executed capability step that has already exited the
......@@ -84,12 +49,11 @@ class CapabilityExecutionIF(ABC):
request: "WorkflowRequestIF",
proj_settings: "ProjectSettingsIF",
sequence: List["CapabilityStepIF"],
current_step: "CapabilityStepIF",
):
self.request = request
self.proj_settings = proj_settings
self.sequence = sequence
self.current_step = current_step
self.current_step = sequence[0]
"""
......@@ -99,6 +63,67 @@ Capability System / Services
"""
class CapabilityInfoIF(ABC):
"""
Interface to stored capability information.
"""
def __init__(
self,
steps: List["CapabilityStepIF"],
matrix: "CapabilityMatrixIF",
proj_settings: "ProjectSettingsIF"
):
self.steps = steps
self.matrix = matrix
self.proj_settings = proj_settings
@abstractmethod
def lookup_capability(self, capability_name: str) -> CapabilityIF:
"""
Look up the definition of a capability.
:param capability_name: the name of the capability to find
:return: a capability
"""
raise NotImplementedError(f'{self.__class__.__name__}.{inspect.stack()[0][3]}')
@abstractmethod
def lookup_capability_request(self, capability_request_id: int) -> CapabilityRequestIF:
"""
Look up a particular request
:param capability_request_id: the request identifier
:return: a capability request
FIXME: What to do with CapabilityRequestIF references?
"""
raise NotImplementedError(f'{self.__class__.__name__}.{inspect.stack()[0][3]}')
@abstractmethod
def save_request(self, request: CapabilityRequestIF) -> int:
"""
Save a capability request and return an integer identifier for it.
:param request: the request to save
:return: the request identifier
FIXME: What to do with CapabilityRequestIF references?
"""
raise NotImplementedError(f'{self.__class__.__name__}.{inspect.stack()[0][3]}')
class CapabilityQueueIF(ABC):
"""
Organizes requests in a priority order and makes it possible to control
the number of concurrent executions or pause execution altogether
FIXME: What to do with CapabilityRequestIF references?
"""
def __init__(self, queue: PriorityQueue, max_concurrency: int):
self.queue = queue
self.max_concurrency = max_concurrency
@abstractmethod
def enqueue(self, request: CapabilityRequestIF):
raise NotImplementedError(f'{self.__class__.__name__}.{inspect.stack()[0][3]}')
class CapabilityEngineIF(ABC):
"""
Executes a capability.
......@@ -151,6 +176,7 @@ class CapabilityServiceIF(ABC):
:param request: the request to execute
:return: None
FIXME: What to do with CapabilityRequestIF references?
"""
raise NotImplementedError(f'{self.__class__.__name__}.{inspect.stack()[0][3]}')
......@@ -209,50 +235,6 @@ Capability System / Helpers
"""
class CapabilityInfoIF(ABC):
"""
Interface to stored capability information.
"""
def __init__(
self,
steps: List["CapabilityStepIF"],
matrix: "CapabilityMatrixIF",
proj_settings: "ProjectSettingsIF"
):
self.steps = steps
self.matrix = matrix
self.proj_settings = proj_settings
@abstractmethod
def lookup_capability(self, capability_name: str) -> CapabilityIF:
"""
Look up the definition of a capability.
:param capability_name: the name of the capability to find
:return: a capability
"""
raise NotImplementedError(f'{self.__class__.__name__}.{inspect.stack()[0][3]}')
@abstractmethod
def lookup_capability_request(self, capability_request_id: int) -> CapabilityRequestIF:
"""
Look up a particular request
:param capability_request_id: the request identifier
:return: a capability request
"""
raise NotImplementedError(f'{self.__class__.__name__}.{inspect.stack()[0][3]}')
@abstractmethod
def save_request(self, request: CapabilityRequestIF) -> int:
"""
Save a capability request and return an integer identifier for it.
:param request: the request to save
:return: the request identifier
"""
raise NotImplementedError(f'{self.__class__.__name__}.{inspect.stack()[0][3]}')
class CapabilityMatrixIF(ABC):
"""
Maps CASA versions to version-specific templates, allowing us to support a variety of
......@@ -288,45 +270,18 @@ class QueueRunnerIF(Thread, ABC):
pass
class CapabilityQueueIF(ABC):
"""
Organizes requests in a priority order and makes it possible to control
the number of concurrent executions or pause execution altogether
"""
def __init__(self, queue: PriorityQueue, max_concurrency: int):
self.queue = queue
self.max_concurrency = max_concurrency
@abstractmethod
def enqueue(self, request: CapabilityRequestIF):
raise NotImplementedError(f'{self.__class__.__name__}.{inspect.stack()[0][3]}')
class EventTranslatorIF(ABC):
"""
Translates a Workflow(?) event into data that a capability can understand
FIXME: What does this do exactly?
Translates a Workflow event into a Capability event
"""
pass
class CapabilityStepType(Enum):
"""
Enum that specifies the types of CapabilitySteps that are possible
"""
AwaitQA = 0
AwaitWorkflow = 1
AwaitProduct = 2
AwaitParameter = 3
AwaitLargeAllocApproval = 4
PrepareAndRunWorkflow = 5
class CapabilityStepIF(ABC):
"""
A step in a capability sequence
"""
def __init__(self, step_type: CapabilityStepType, next_step: Optional["CapabilityStepIF"]):
def __init__(self, step_type: "CapabilityStepType", next_step: Optional["CapabilityStepIF"]):
self.step_type = step_type
self.next = next_step
......@@ -347,7 +302,7 @@ class CapabilityEngineResponderIF(ABC):
Abstracts the callbacks for a capability engine
"""
@abstractmethod
def await_parameter(self, step: CapabilityStepIF, parameter_type: Type[ParameterIF]) -> ParameterIF:
def await_parameter(self, step: CapabilityStepIF, parameter_type: Type["ParameterIF"]) -> "ParameterIF":
raise NotImplementedError(f'{self.__class__.__name__}.{inspect.stack()[0][3]}')
@abstractmethod
......@@ -355,13 +310,62 @@ class CapabilityEngineResponderIF(ABC):
raise NotImplementedError(f'{self.__class__.__name__}.{inspect.stack()[0][3]}')
@abstractmethod
def prepare_and_run_workflow(self, step: CapabilityStepIF, name: str, param: ParameterIF, files: List[Path]):
def prepare_and_run_workflow(self, step: CapabilityStepIF, name: str, param: "ParameterIF", files: List[Path]):
raise NotImplementedError(f'{self.__class__.__name__}.{inspect.stack()[0][3]}')
class FutureCapabilityResultIF(ABC, ProductIF):
"""
Class that represents a product that will in the future result from a completed
capability execution
FIXME: What to do with CapabilityRequestIF references?
"""
def __init__(self, capability_request: CapabilityRequestIF):
self.capability_request = capability_request
class ParameterIF(ABC):
"""
Class that represents a capability parameter, a required input for the correct amd complete
execution of said capability
"""
pass
class TicketIF(ABC):
"""
Class that represents a JIRA ticket corresponding to the execution of a particular capability
FIXME: Should this class be in this file? It doesn't directly relate to capabilities.
"""
"""
Capabilities / Capability Step Types
FIXME: Do we need these classes? We have the enum below, but that might not be what we want.
FIXME: If we do want classes for each type, we need to add a couple more classes for the missing
step types.
"""
class CapabilityStepType(Enum):
"""
Enum that specifies the types of CapabilitySteps that are possible
"""
AwaitQA = 0
AwaitWorkflow = 1
AwaitProduct = 2
AwaitParameter = 3
AwaitLargeAllocApproval = 4
PrepareAndRunWorkflow = 5
class AwaitProductIF(CapabilityStepIF, ABC):
"""
Wait for a product to become available.
FIXME: What to do with CapabilityRequestIF references?
"""
# FIXME: Does this need to be a class attribute?
product: ProductLocator
......@@ -383,6 +387,7 @@ class AwaitProductIF(CapabilityStepIF, ABC):
class AwaitParameterIF(CapabilityStepIF, ABC):
"""
Wait for a certain parameter to arrive (probably from the UI).
FIXME: What to do with CapabilityRequestIF references?
"""
# FIXME: Does this need to be a class attribute?
parameter_type: Type[ParameterIF]
......@@ -397,6 +402,7 @@ class AwaitParameterIF(CapabilityStepIF, ABC):
class PrepareAndRunWorkflowIF(CapabilityStepIF, ABC):
"""
Render templates and execute a workflow, awaiting its completion.
FIXME: What to do with CapabilityRequestIF references?
"""
# FIXME: Does this need to be a class attribute?
workflow_name: str
......@@ -408,10 +414,3 @@ class PrepareAndRunWorkflowIF(CapabilityStepIF, ABC):
responder.prepare_and_run_workflow(self, self.workflow_name, request.last_parameter, request.files)
class FutureCapabilityResultIF(ABC, ProductIF):
"""
Class that represents a product that will in the future result from a completed
capability execution
"""
def __init__(self, capability_request: CapabilityRequestIF):
self.capability_request = capability_request
......@@ -15,7 +15,7 @@ from sqlalchemy.ext.declarative import declarative_base
from .capability_interfaces import CapabilityIF, CapabilityName, \
ProductLocator, CapabilityRequestIF, ParameterIF, CapabilityQueueIF, \
TicketIF, CapabilityInfoIF
TicketIF, CapabilityInfoIF, CapabilityExecutionIF
class AbstractFile:
......@@ -106,32 +106,20 @@ class Capability(Base, CapabilityIF):
"""
A capability, which is a particular workflow setup, intended to accept
a certain kind of product and some parameters and produce another product
FIXME: Needs to pull workflow steps, capability info, capability params, etc. from database
"""
__tablename__ = 'capabilities'
capability_name = sa.Column('capability_name', sa.String, primary_key=True)
max_jobs = sa.Column('max_jobs', sa.Integer)
def __init__(self, name: CapabilityName, max_jobs: int, locators: List[ProductLocator]):
self.name = name
self.max_jobs = max_jobs
self.locators = locators
self.request = self.create_request(self.locators)
def create_request(self, locators: List[ProductLocator]) -> CapabilityRequestIF:
# FIXME: Fill in correct parameter details
return CapabilityRequest(
capability=self,
id=None,
parameters=[],
files=[],
versions=[],
tickets=[],
queue=None
)
class CapabilityRequest(Base, CapabilityRequestIF):
def create_request(self):
"""
Create a new request entry in the database
FIXME: Implement
"""
pass
class CapabilityRequest(Base):
"""
A capability request, which couples a capability to a product, representing
the expectation of a new product given a set of parameters
......@@ -142,28 +130,18 @@ class CapabilityRequest(Base, CapabilityRequestIF):
sa.String,
sa.ForeignKey('capabilities.capability_name')
)
locators = sa.Column('product_locators', sa.String)
# FIXME: Should this be type JSON or type String or even some other type
parameters = sa.Column('parameters', sa.JSON)
files = sa.Column('files', sa.String)
versions = sa.Column('versions', sa.String)
# FIXME: 'tickets' table does not currently exist
tickets = sa.Column('tickets', sa.String, sa.ForeignKey('tickets.ticket_name'))
def __init__(
self,
capability: CapabilityIF,
info: CapabilityInfoIF,
queue: CapabilityQueueIF, # Do we need this here?
request_id: int = None,
):
self.capability = capability
self.parameters = info.proj_settings.parameters
self.files = info.proj_settings.files
self.versions = info.proj_settings.versions
self.tickets = info.proj_settings.tickets
self.queue = queue
self.id = request_id
class CapabilityExecution(CapabilityExecutionIF):
__tablename__ = 'capability_executions'
id = sa.Column('execution_id', sa.Integer, primary_key=True)
wf_request = sa.Column(
'workflow_request_id',
sa.Integer,
sa.ForeignKey('workflow_requests.workflow_request_id')
)
class Workflow(Base):
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment