Skip to content
Snippets Groups Projects
Commit 77322a17 authored by Nathan Hertz's avatar Nathan Hertz
Browse files

Added interfaces from wksp0/ifaces.py

parent d574c87a
No related branches found
No related tags found
No related merge requests found
...@@ -4,9 +4,261 @@ Interfaces used by the Workspace system. ...@@ -4,9 +4,261 @@ Interfaces used by the Workspace system.
import inspect import inspect
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from dataclasses import dataclass
from pathlib import Path from pathlib import Path
from typing import Dict, List from typing import Dict, List
ProductLocator = str
CapabilityName = str
@dataclass
class Capability(ABC):
name: CapabilityName
max_jobs: int
"""
A capability
"""
def create_request(self, locators: List[ProductLocator]):
"""
Create a capability request for this capability
:param locators: product locators for the new request
:return: a capability request
"""
raise NotImplementedError(f'{self.__class__.__name__}.{inspect.stack()[0][3]}')
class ProductService(ABC):
"""
Locate products and realize them on disk (haha).
"""
@abstractmethod
def locate_product(self, product_locator: ProductLocator) -> Path:
"""
Locates a given product and produces a file path to it.
:param product_locator: the locator to this product
:return: a path to this product
"""
raise NotImplementedError(f'{self.__class__.__name__}.{inspect.stack()[0][3]}')
@dataclass
class CapabilityRequest:
"""
A particular capability request
"""
capability: Capability
locators: List[ProductLocator]
id: Optional[int]
parameters: List["Parameter"]
files: List[Path]
@property
def last_parameter(self) -> "Parameter":
return self.parameters[-1]
class CapabilityInfo(ABC):
"""
Interface to stored capability information.
"""
@abstractmethod
def lookup_capability(self, capability_name: str) -> Capability:
"""
Look up the definition of a capability.
:param capability_name: the name of the capability to find
:return: a capability
"""
raise NotImplementedError(f'{self.__class__.__name__}.{inspect.stack()[0][3]}')
@abstractmethod
def lookup_capability_request(self, capability_request_id: int) -> CapabilityRequest:
"""
Look up a particular request
:param capability_request_id: the request identifier
:return: a capability request
"""
raise NotImplementedError(f'{self.__class__.__name__}.{inspect.stack()[0][3]}')
@abstractmethod
def save_request(self, request: CapabilityRequest) -> int:
"""
Save a capability request and return an integer identifier for it.
:param request: the request to save
:return: the request identifier
"""
raise NotImplementedError(f'{self.__class__.__name__}.{inspect.stack()[0][3]}')
class QueueRunner(Thread, ABC):
pass
class CapabilityQueue(ABC):
"""
Holds capability requests until they can be executed.
"""
@abstractmethod
def enqueue(self, request: CapabilityRequest):
raise NotImplementedError(f'{self.__class__.__name__}.{inspect.stack()[0][3]}')
class CapabilityService(ABC):
"""
The capability service: clients access this to request capability runs
"""
@abstractmethod
def send_request(self, name: CapabilityName, locators: List[ProductLocator]) -> CapabilityRequest:
"""
Start a capability request with the given capability name and product locators.
:param name: the capability name to look things up with
:param locators: the products to start the capability with
:return: a new capability request
"""
raise NotImplementedError(f'{self.__class__.__name__}.{inspect.stack()[0][3]}')
@abstractmethod
def execute(self, request: CapabilityRequest) -> None:
"""
Begin executing a capability request
:param request: the request to execute
:return: None
"""
raise NotImplementedError(f'{self.__class__.__name__}.{inspect.stack()[0][3]}')
class ArchiveService(ABC):
"""
Abstracts services that are needed from the archive system.
"""
@abstractmethod
def lookup_product(self, locator: ProductLocator) -> ScienceProduct:
"""
Look up a science product by its locator
:param locator: science product locator for this product
:return: science product
"""
raise NotImplementedError(f'{self.__class__.__name__}.{inspect.stack()[0][3]}')
FieldName = str
FieldLabel = str
class Parameter(ABC):
"""
Abstracts parameters needed for running capabilities.
"""
@staticmethod
def fields() -> Dict[FieldName, FieldLabel]:
raise NotImplementedError(f'Parameter.{inspect.stack()[0][3]}')
def json(self) -> Dict[str, str]:
raise NotImplementedError(f'{self.__class__.__name__}.{inspect.stack()[0][3]}')
def load(self, json: Dict[str, str]) -> None:
raise NotImplementedError(f'{self.__class__.__name__}.{inspect.stack()[0][3]}')
class CapabilityStep(ABC):
"""
A step in a capability sequence
"""
@abstractmethod
def execute_against(self, request: CapabilityRequest, responder: "CapabilityEngineResponder"):
"""
Execute this capability step
:return: None
"""
raise NotImplementedError(f'{self.__class__.__name__}.{inspect.stack()[0][3]}')
class CapabilityEngineResponder(ABC):
"""
Abstracts the callbacks for a capability engine
"""
@abstractmethod
def await_parameter(self, step: CapabilityStep, parameter_type: Type[Parameter]) -> Parameter:
raise NotImplementedError(f'{self.__class__.__name__}.{inspect.stack()[0][3]}')
@abstractmethod
def await_product(self, step: CapabilityStep, product_locator: ProductLocator):
raise NotImplementedError(f'{self.__class__.__name__}.{inspect.stack()[0][3]}')
@abstractmethod
def prepare_and_run_workflow(self, step: CapabilityStep, name: str, param: Parameter, files: List[Path]):
raise NotImplementedError(f'{self.__class__.__name__}.{inspect.stack()[0][3]}')
class CapabilitySequence(ABC):
"""
Represents the sequence of events in a capability.
"""
pass
class CapabilityEngine(ABC):
"""
Executes a capability.
"""
@abstractmethod
def execute(self, request):
pass
class AwaitProduct(CapabilityStep, ABC):
"""
Wait for a product to become available.
"""
product: ProductLocator
def __init__(self, product: Optional[ProductLocator]=None):
self.product = product
def execute_against(self, request: CapabilityRequest, responder: CapabilityEngineResponder):
# if we have a product, await it
if self.product:
request.files.append(responder.await_product(self, self.product))
# if we do not, await the locators on the request itself
else:
for locator in request.locators:
request.files.append(responder.await_product(self, locator))
class AwaitParameter(CapabilityStep, ABC):
"""
Wait for a certain parameter to arrive (probably from the UI).
"""
parameter_type: Type[Parameter]
def __init__(self, parameter_type: Type[Parameter]):
self.parameter_type = parameter_type
def execute_against(self, request: CapabilityRequest, responder: CapabilityEngineResponder):
request.parameters.append(responder.await_parameter(self, self.parameter_type))
class PrepareAndRunWorkflow(CapabilityStep, ABC):
"""
Render templates and execute a workflow, awaiting its completion.
"""
workflow_name: str
def __init__(self, workflow_name: str):
self.workflow_name = workflow_name
def execute_against(self, request: CapabilityRequest, responder: CapabilityEngineResponder):
responder.prepare_and_run_workflow(self, self.workflow_name, request.last_parameter, request.files)
class WorkflowServiceIF(ABC): class WorkflowServiceIF(ABC):
""" """
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment