Skip to content
Snippets Groups Projects
Commit 6c353c24 authored by Nathan Hertz's avatar Nathan Hertz
Browse files

Added some inits; added Product classes from Daniel's diagram; added

comments and reorganized the order of the classes
parent 83047c8c
No related branches found
No related tags found
No related merge requests found
......@@ -4,6 +4,7 @@ Interfaces used by the Workspace system.
import inspect
from abc import ABC, abstractmethod
from enum import Enum
from pathlib import Path
from queue import PriorityQueue
from threading import Thread
......@@ -13,14 +14,22 @@ ProductLocator = str
CapabilityName = str
"""
Capability System / Persistent Entities
"""
class CapabilityIF(ABC):
"""
A capability, which is a particular workflow setup, intended to accept
a certain kind of product and some parameters and produce another product
"""
name: CapabilityName
max_jobs: int
request: "CapabilityRequestIF"
def __init__(self, name: CapabilityName, max_jobs: int, request: "CapabilityRequestIF"):
self.name = name
self.max_jobs = max_jobs
self.request = request
def create_request(self, locators: List[ProductLocator]) -> "CapabilityRequestIF":
"""
......@@ -32,29 +41,22 @@ class CapabilityIF(ABC):
raise NotImplementedError(f'{self.__class__.__name__}.{inspect.stack()[0][3]}')
class ProductServiceIF(ABC):
"""
Locate products and realize them on disk (haha).
"""
@abstractmethod
def locate_product(self, product_locator: ProductLocator) -> Path:
"""
Locates a given product and produces a file path to it.
:param product_locator: the locator to this product
:return: a path to this product
"""
raise NotImplementedError(f'{self.__class__.__name__}.{inspect.stack()[0][3]}')
class CapabilityRequestIF(ABC):
"""
A capability request, which couples a capability to a product, representing
the expectation of a new product given a set of parameters
"""
request_id: Optional[int]
capability: CapabilityIF
info: "CapabilityInfoIF"
queue: "CapabilityQueueIF"
def __init__(
self,
request_id: Optional[int],
capability: CapabilityIF,
info: "CapabilityInfoIF",
queue: "CapabilityQueueIF"
):
self.request_id = request_id
self.capability = capability
self.info = info
self.queue = queue
@property
def last_parameter(self) -> "ParameterIF":
......@@ -70,85 +72,61 @@ class CapabilityRequestIF(ABC):
)
class CapabilityInfoIF(ABC):
class CapabilityExecutionIF(ABC):
"""
Interface to stored capability information.
An object representing an executed capability step that has already exited the
queue after being executed
"""
steps: List["CapabilityStepIF"]
matrix: "CapabilityMatrixIF"
proj_settings: "ProjectSettingsIF"
@abstractmethod
def lookup_capability(self, capability_name: str) -> CapabilityIF:
"""
Look up the definition of a capability.
:param capability_name: the name of the capability to find
:return: a capability
"""
raise NotImplementedError(f'{self.__class__.__name__}.{inspect.stack()[0][3]}')
@abstractmethod
def lookup_capability_request(self, capability_request_id: int) -> CapabilityRequestIF:
"""
Look up a particular request
:param capability_request_id: the request identifier
:return: a capability request
"""
raise NotImplementedError(f'{self.__class__.__name__}.{inspect.stack()[0][3]}')
def __init__(
self,
request: "WorkflowRequestIF",
proj_settings: "ProjectSettingsIF",
sequence: List["CapabilityStepIF"],
current_step: "CapabilityStepIF",
):
self.request = request
self.proj_settings = proj_settings
self.sequence = sequence
self.current_step = current_step
@abstractmethod
def save_request(self, request: CapabilityRequestIF) -> int:
"""
Save a capability request and return an integer identifier for it.
:param request: the request to save
:return: the request identifier
"""
raise NotImplementedError(f'{self.__class__.__name__}.{inspect.stack()[0][3]}')
"""
Capability System / Services
class CapabilityMatrixIF(ABC):
pass
"""
class ProjectSettingsIF(ABC):
class CapabilityEngineIF(ABC):
"""
Stores relevant information about a particular execution of a capability
Executes a capability.
"""
parameters: List["ParameterIF"]
files: List[Path]
# FIXME: Will versions be str or a class of their own?
versions: List[str]
tickets: List["TicketIF"]
@abstractmethod
def execute(self, request):
pass
class QueueRunnerIF(Thread, ABC):
class CapabilityFactoryIF(ABC):
pass
class CapabilityQueueIF(ABC):
"""
Organizes requests in a priority order and makes it possible to control
the number of concurrent executions or pause execution altogether
"""
queue: PriorityQueue
max_concurrency: int
@abstractmethod
def enqueue(self, request: CapabilityRequestIF):
raise NotImplementedError(f'{self.__class__.__name__}.{inspect.stack()[0][3]}')
class CapabilityServiceIF(ABC):
"""
The capability service: clients access this to request capability runs
"""
event_translator: "EventTranslatorIF"
queue: CapabilityQueueIF
info: CapabilityInfoIF
estimation_service: "EstimationServiceIF"
metrics_service: "MetricsServiceIF"
def __init__(
self,
event_translator: "EventTranslatorIF",
queue: "CapabilityQueueIF",
info: "CapabilityInfoIF",
estimation_service: "EstimationServiceIF",
metrics_service: "MetricsServiceIF"
):
self.event_translator = event_translator
self.queue = queue
self.info = info
self.estimation_service = estimation_service
self.metrics_service = metrics_service
@abstractmethod
def send_request(self, name: CapabilityName, locators: List[ProductLocator]) -> CapabilityRequestIF:
......@@ -172,8 +150,18 @@ class CapabilityServiceIF(ABC):
raise NotImplementedError(f'{self.__class__.__name__}.{inspect.stack()[0][3]}')
class CapabilityFactoryIF(ABC):
pass
class ProductServiceIF(ABC):
"""
Locate products and realize them on disk (haha).
"""
@abstractmethod
def locate_product(self, product_locator: ProductLocator) -> Path:
"""
Locates a given product and produces a file path to it.
:param product_locator: the locator to this product
:return: a path to this product
"""
raise NotImplementedError(f'{self.__class__.__name__}.{inspect.stack()[0][3]}')
class EstimationServiceIF(ABC):
......@@ -189,7 +177,7 @@ class ArchiveServiceIF(ABC):
Abstracts services that are needed from the archive system.
"""
@abstractmethod
def lookup_product(self, locator: ProductLocator) -> "ScienceProductIF":
def lookup_product(self, locator: ProductLocator) -> "ProductIF":
"""
Look up a science product by its locator
:param locator: science product locator for this product
......@@ -202,49 +190,130 @@ FieldName = str
FieldLabel = str
class EventTranslatorIF(ABC):
pass
"""
Capability System / Helpers
"""
class ParameterIF(ABC):
class CapabilityInfoIF(ABC):
"""
Abstracts parameter needed for running capabilities
Interface to stored capability information.
"""
@staticmethod
def __init__(
self,
steps: List["CapabilityStepIF"],
matrix: "CapabilityMatrixIF",
proj_settings: "ProjectSettingsIF"
):
self.steps = steps
self.matrix = matrix
self.proj_settings = proj_settings
@abstractmethod
def fields() -> Dict[FieldName, FieldLabel]:
raise NotImplementedError(f'Parameter.{inspect.stack()[0][3]}')
def lookup_capability(self, capability_name: str) -> CapabilityIF:
"""
Look up the definition of a capability.
:param capability_name: the name of the capability to find
:return: a capability
"""
raise NotImplementedError(f'{self.__class__.__name__}.{inspect.stack()[0][3]}')
@abstractmethod
def json(self) -> Dict[str, str]:
def lookup_capability_request(self, capability_request_id: int) -> CapabilityRequestIF:
"""
Look up a particular request
:param capability_request_id: the request identifier
:return: a capability request
"""
raise NotImplementedError(f'{self.__class__.__name__}.{inspect.stack()[0][3]}')
@abstractmethod
def load(self, json: Dict[str, str]) -> None:
def save_request(self, request: CapabilityRequestIF) -> int:
"""
Save a capability request and return an integer identifier for it.
:param request: the request to save
:return: the request identifier
"""
raise NotImplementedError(f'{self.__class__.__name__}.{inspect.stack()[0][3]}')
class TicketIF(ABC):
class CapabilityMatrixIF(ABC):
"""
Maps CASA versions to version-specific templates, allowing us to support a variety of
CASA versions
"""
pass
class ScienceProductIF(ABC):
class ProjectSettingsIF(ABC):
"""
Stores relevant information about a particular execution of a capability
"""
def __init__(
self,
parameters: List["ParameterIF"],
files: List[Path],
# FIXME: Will versions be str or a class of their own?
versions: List[str],
tickets: List["TicketIF"]
):
self.parameters = parameters
self.files = files
self.versions = versions
self.tickets = tickets
class QueueRunnerIF(Thread, ABC):
"""
"""
pass
class CapabilityQueueIF(ABC):
"""
Organizes requests in a priority order and makes it possible to control
the number of concurrent executions or pause execution altogether
"""
def __init__(self, queue: PriorityQueue, max_concurrency: int):
self.queue = queue
self.max_concurrency = max_concurrency
@abstractmethod
def enqueue(self, request: CapabilityRequestIF):
raise NotImplementedError(f'{self.__class__.__name__}.{inspect.stack()[0][3]}')
class EventTranslatorIF(ABC):
pass
class CapabilityStepType(Enum):
AwaitQA = 0
AwaitWorkflow = 1
AwaitProduct = 2
AwaitParameter = 3
AwaitLargeAllocApproval = 4
PrepareAndRunWorkflow = 5
class CapabilityStepIF(ABC):
"""
A step in a capability sequence
Types of steps:
1. Await QA
2. Await workflow
3. Await product
4. Prepare and run workflow
"""
next: "CapabilityStepIF"
def __init__(self, step_type: CapabilityStepType, next_step: Optional["CapabilityStepIF"]):
self.step_type = step_type
self.next = next_step
@abstractmethod
def execute_against(self, request: CapabilityRequestIF, responder: "CapabilityEngineResponderIF"):
def execute_against(
self, request: CapabilityRequestIF,
responder: "CapabilityEngineResponderIF"
):
"""
Execute this capability step
:return: None
......@@ -252,24 +321,6 @@ class CapabilityStepIF(ABC):
raise NotImplementedError(f'{self.__class__.__name__}.{inspect.stack()[0][3]}')
class CapabilityExecutionIF(ABC):
"""
An object representing an executed capability step that has already exited the
queue after being executed
"""
def __init__(
self,
request: "WorkflowRequestIF",
proj_settings: ProjectSettingsIF,
sequence: List[CapabilityStepIF],
current_step: CapabilityStepIF,
):
self.request = request
self.proj_settings = proj_settings
self.sequence = sequence
self.current_step = current_step
class CapabilityEngineResponderIF(ABC):
"""
Abstracts the callbacks for a capability engine
......@@ -287,15 +338,6 @@ class CapabilityEngineResponderIF(ABC):
raise NotImplementedError(f'{self.__class__.__name__}.{inspect.stack()[0][3]}')
class CapabilityEngineIF(ABC):
"""
Executes a capability.
"""
@abstractmethod
def execute(self, request):
pass
class AwaitProductIF(CapabilityStepIF, ABC):
"""
Wait for a product to become available.
......@@ -345,6 +387,68 @@ class PrepareAndRunWorkflowIF(CapabilityStepIF, ABC):
responder.prepare_and_run_workflow(self, self.workflow_name, request.last_parameter, request.files)
"""
Products / Persistent Entities
"""
class CurrentArchiveProductIF(ABC, "ProductIF"):
def __init__(self, product_locator: ProductLocator):
self.product_locator = product_locator
"""
Products / Helpers
"""
class ProductIF(ABC):
# FIXME: Needs attributes
def __init__(self):
pass
class FutureCapabilityResultIF(ABC, ProductIF):
def __init__(self, capability_request: CapabilityRequestIF):
self.capability_request = capability_request
class FutureArchiveProductIF(ABC, ProductIF):
# FIXME: Implement ProductType
def __init__(self, future_product_type: "FutureProductTypeIF"):
self.type = future_product_type
class FutureProductTypeIF(ABC):
pass
class FutureExecutionBlockIF(ABC, FutureProductTypeIF):
pass
class FutureProductIF(ABC, FutureProductTypeIF):
def __init__(self, product: Optional[ProductIF], product_type: Optional["ProductTypeIF"]):
self.product = product
self.product_type = product_type
class ProductTypeIF(ABC):
# FIXME: We haven't decided how we want to implement this yet
pass
"""
Workflow System
"""
class WorkflowServiceIF(ABC):
"""
Executes workflows; should be a freestanding service.
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment